Fixes column default coming from a sequence (#4914)

* Add user-defined sequence support for MX

* Remove default part when propagating to workers

* Fix ALTER TABLE with sequences for mx tables

* Clean up and add tests

* Propagate DROP SEQUENCE

* Removing function parts

* Propagate ALTER SEQUENCE

* Change sequence type before propagation & cleanup

* Revert "Propagate ALTER SEQUENCE"

This reverts commit 2bef64c5a29f4e7224a7f43b43b88e0133c65159.

* Ensure sequence is not used in a different column with different type

* Insert select tests

* Propagate rename sequence stmt

* Fix issue with group ID cache invalidation

* Add ALTER TABLE ALTER COLUMN TYPE .. precaution

* Fix attnum inconsistency and add various tests

* Add ALTER SEQUENCE precaution

* Remove Citus hook

* More tests

Co-authored-by: Marco Slot <marco.slot@gmail.com>
pull/5020/head
Naisila Puka 2021-06-03 23:02:09 +03:00 committed by GitHub
parent ec9664c5a4
commit 0f37ab5f85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 2656 additions and 109 deletions

View File

@ -204,7 +204,6 @@ static char * GetAccessMethodForMatViewIfExists(Oid viewOid);
static bool WillRecreateForeignKeyToReferenceTable(Oid relationId,
CascadeToColocatedOption cascadeOption);
static void WarningsForDroppingForeignKeysWithDistributedTables(Oid relationId);
static void ExecuteQueryViaSPI(char *query, int SPIOK);
PG_FUNCTION_INFO_V1(undistribute_table);
PG_FUNCTION_INFO_V1(alter_distributed_table);

View File

@ -115,7 +115,6 @@ static List * GetFKeyCreationCommandsRelationInvolvedWithTableType(Oid relationI
int tableTypeFlag);
static Oid DropFKeysAndUndistributeTable(Oid relationId);
static void DropFKeysRelationInvolvedWithTableType(Oid relationId, int tableTypeFlag);
static bool LocalTableEmpty(Oid tableId);
static void CopyLocalDataIntoShards(Oid relationId);
static List * TupleDescColumnNameList(TupleDesc tupleDescriptor);
static bool DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc,
@ -459,7 +458,7 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
EnsureReferenceTablesExistOnAllNodes();
/* we need to calculate these variables before creating distributed metadata */
bool localTableEmpty = LocalTableEmpty(relationId);
bool localTableEmpty = TableEmpty(relationId);
Oid colocatedTableId = ColocatedTableId(colocationId);
/* create an entry for distributed table in pg_dist_partition */
@ -1113,7 +1112,7 @@ static void
EnsureLocalTableEmpty(Oid relationId)
{
char *relationName = get_rel_name(relationId);
bool localTableEmpty = LocalTableEmpty(relationId);
bool localTableEmpty = TableEmpty(relationId);
if (!localTableEmpty)
{
@ -1249,50 +1248,40 @@ SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
/*
* LocalTableEmpty function checks whether given local table contains any row and
* returns false if there is any data. This function is only for local tables and
* should not be called for distributed tables.
* TableEmpty function checks whether given table contains any row and
* returns false if there is any data.
*/
static bool
LocalTableEmpty(Oid tableId)
bool
TableEmpty(Oid tableId)
{
Oid schemaId = get_rel_namespace(tableId);
char *schemaName = get_namespace_name(schemaId);
char *tableName = get_rel_name(tableId);
char *tableQualifiedName = quote_qualified_identifier(schemaName, tableName);
StringInfo selectExistQueryString = makeStringInfo();
StringInfo selectTrueQueryString = makeStringInfo();
bool columnNull = false;
bool readOnly = true;
int rowId = 0;
int attributeId = 1;
AssertArg(!IsCitusTable(tableId));
int spiConnectionResult = SPI_connect();
if (spiConnectionResult != SPI_OK_CONNECT)
{
ereport(ERROR, (errmsg("could not connect to SPI manager")));
}
appendStringInfo(selectExistQueryString, SELECT_EXIST_QUERY, tableQualifiedName);
appendStringInfo(selectTrueQueryString, SELECT_TRUE_QUERY, tableQualifiedName);
int spiQueryResult = SPI_execute(selectExistQueryString->data, readOnly, 0);
int spiQueryResult = SPI_execute(selectTrueQueryString->data, readOnly, 0);
if (spiQueryResult != SPI_OK_SELECT)
{
ereport(ERROR, (errmsg("execution was not successful \"%s\"",
selectExistQueryString->data)));
selectTrueQueryString->data)));
}
/* we expect that SELECT EXISTS query will return single value in a single row */
Assert(SPI_processed == 1);
/* we expect that SELECT TRUE query will return single value in a single row OR empty set */
Assert(SPI_processed == 1 || SPI_processed == 0);
HeapTuple tuple = SPI_tuptable->vals[rowId];
Datum hasDataDatum = SPI_getbinval(tuple, SPI_tuptable->tupdesc, attributeId,
&columnNull);
bool localTableEmpty = !DatumGetBool(hasDataDatum);
bool localTableEmpty = !SPI_processed;
SPI_finish();

View File

@ -366,6 +366,34 @@ static DistributeObjectOps Routine_AlterObjectDepends = {
.postprocess = NULL,
.address = AlterFunctionDependsStmtObjectAddress,
};
static DistributeObjectOps Sequence_Alter = {
.deparse = NULL,
.qualify = NULL,
.preprocess = PreprocessAlterSequenceStmt,
.postprocess = NULL,
.address = AlterSequenceObjectAddress,
};
static DistributeObjectOps Sequence_AlterObjectSchema = {
.deparse = NULL,
.qualify = NULL,
.preprocess = PreprocessAlterSequenceSchemaStmt,
.postprocess = NULL,
.address = AlterSequenceSchemaStmtObjectAddress,
};
static DistributeObjectOps Sequence_Drop = {
.deparse = DeparseDropSequenceStmt,
.qualify = NULL,
.preprocess = PreprocessDropSequenceStmt,
.postprocess = NULL,
.address = NULL,
};
static DistributeObjectOps Sequence_Rename = {
.deparse = DeparseRenameSequenceStmt,
.qualify = QualifyRenameSequenceStmt,
.preprocess = PreprocessRenameSequenceStmt,
.postprocess = NULL,
.address = RenameSequenceStmtObjectAddress,
};
static DistributeObjectOps Trigger_AlterObjectDepends = {
.deparse = NULL,
.qualify = NULL,
@ -460,7 +488,7 @@ static DistributeObjectOps Statistics_Rename = {
.address = NULL,
};
static DistributeObjectOps Table_AlterTable = {
.deparse = NULL,
.deparse = DeparseAlterTableStmt,
.qualify = NULL,
.preprocess = PreprocessAlterTableStmt,
.postprocess = NULL,
@ -628,6 +656,11 @@ GetDistributeObjectOps(Node *node)
return &Routine_AlterObjectSchema;
}
case OBJECT_SEQUENCE:
{
return &Sequence_AlterObjectSchema;
}
case OBJECT_STATISTIC_EXT:
{
return &Statistics_AlterObjectSchema;
@ -717,6 +750,11 @@ GetDistributeObjectOps(Node *node)
return &Any_AlterRoleSet;
}
case T_AlterSeqStmt:
{
return &Sequence_Alter;
}
#if PG_VERSION_NUM >= PG_VERSION_13
case T_AlterStatsStmt:
{
@ -873,6 +911,11 @@ GetDistributeObjectOps(Node *node)
return &Schema_Drop;
}
case OBJECT_SEQUENCE:
{
return &Sequence_Drop;
}
case OBJECT_STATISTIC_EXT:
{
return &Statistics_Drop;
@ -967,6 +1010,11 @@ GetDistributeObjectOps(Node *node)
return &Schema_Rename;
}
case OBJECT_SEQUENCE:
{
return &Sequence_Rename;
}
case OBJECT_STATISTIC_EXT:
{
return &Statistics_Rename;

View File

@ -1429,7 +1429,7 @@ PreprocessAlterFunctionSchemaStmt(Node *node, const char *queryString,
/*
* PreprocessAlterTypeOwnerStmt is called for change of owner ship of functions before the owner
* PreprocessAlterFunctionOwnerStmt is called for change of owner ship of functions before the owner
* ship is changed on the local instance.
*
* If the function for which the owner is changed is distributed we execute the change on

View File

@ -14,14 +14,20 @@
#include "catalog/dependency.h"
#include "catalog/namespace.h"
#include "commands/defrem.h"
#include "commands/extension.h"
#include "distributed/commands.h"
#include "distributed/commands/sequence.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "nodes/parsenodes.h"
/* Local functions forward declarations for helper functions */
static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId);
static bool ShouldPropagateAlterSequence(const ObjectAddress *address);
/*
@ -92,15 +98,6 @@ ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt)
errmsg("cannot alter OWNED BY option of a sequence "
"already owned by a distributed table")));
}
else if (!hasDistributedOwner && IsCitusTable(newOwnedByTableId))
{
/* and don't let local sequences get a distributed OWNED BY */
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot associate an existing sequence with a "
"distributed table"),
errhint("Use a sequence in a distributed table by specifying "
"a serial column type before creating any shards.")));
}
}
}
@ -209,3 +206,305 @@ ExtractDefaultColumnsAndOwnedSequences(Oid relationId, List **columnNameList,
relation_close(relation, NoLock);
}
/*
* PreprocessDropSequenceStmt gets called during the planning phase of a DROP SEQUENCE statement
* and returns a list of DDLJob's that will drop any distributed sequences from the
* workers.
*
* The DropStmt could have multiple objects to drop, the list of objects will be filtered
* to only keep the distributed sequences for deletion on the workers. Non-distributed
* sequences will still be dropped locally but not on the workers.
*/
List *
PreprocessDropSequenceStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
DropStmt *stmt = castNode(DropStmt, node);
List *deletingSequencesList = stmt->objects;
List *distributedSequencesList = NIL;
List *distributedSequenceAddresses = NIL;
Assert(stmt->removeType == OBJECT_SEQUENCE);
if (creating_extension)
{
/*
* extensions should be created separately on the workers, sequences cascading
* from an extension should therefor not be propagated here.
*/
return NIL;
}
if (!EnableDependencyCreation)
{
/*
* we are configured to disable object propagation, should not propagate anything
*/
return NIL;
}
/*
* Our statements need to be fully qualified so we can drop them from the right schema
* on the workers
*/
QualifyTreeNode((Node *) stmt);
/*
* iterate over all sequences to be dropped and filter to keep only distributed
* sequences.
*/
List *objectNameList = NULL;
foreach_ptr(objectNameList, deletingSequencesList)
{
RangeVar *seq = makeRangeVarFromNameList(objectNameList);
Oid seqOid = RangeVarGetRelid(seq, NoLock, stmt->missing_ok);
ObjectAddress sequenceAddress = { 0 };
ObjectAddressSet(sequenceAddress, RelationRelationId, seqOid);
if (!IsObjectDistributed(&sequenceAddress))
{
continue;
}
/* collect information for all distributed sequences */
ObjectAddress *addressp = palloc(sizeof(ObjectAddress));
*addressp = sequenceAddress;
distributedSequenceAddresses = lappend(distributedSequenceAddresses, addressp);
distributedSequencesList = lappend(distributedSequencesList, objectNameList);
}
if (list_length(distributedSequencesList) <= 0)
{
/* no distributed functions to drop */
return NIL;
}
/*
* managing types can only be done on the coordinator if ddl propagation is on. when
* it is off we will never get here. MX workers don't have a notion of distributed
* types, so we block the call.
*/
EnsureCoordinator();
/* remove the entries for the distributed objects on dropping */
ObjectAddress *address = NULL;
foreach_ptr(address, distributedSequenceAddresses)
{
UnmarkObjectDistributed(address);
}
/*
* Swap the list of objects before deparsing and restore the old list after. This
* ensures we only have distributed sequences in the deparsed drop statement.
*/
DropStmt *stmtCopy = copyObject(stmt);
stmtCopy->objects = distributedSequencesList;
stmtCopy->missing_ok = true;
const char *dropStmtSql = DeparseTreeNode((Node *) stmtCopy);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) dropStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessRenameSequenceStmt is called when the user is renaming a sequence. The invocation
* happens before the statement is applied locally.
*
* As the sequence already exists we have access to the ObjectAddress, this is used to
* check if it is distributed. If so the rename is executed on all the workers to keep the
* types in sync across the cluster.
*/
List *
PreprocessRenameSequenceStmt(Node *node, const char *queryString, ProcessUtilityContext
processUtilityContext)
{
RenameStmt *stmt = castNode(RenameStmt, node);
Assert(stmt->renameType == OBJECT_SEQUENCE);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt,
stmt->missing_ok);
if (!ShouldPropagateAlterSequence(&address))
{
return NIL;
}
EnsureCoordinator();
QualifyTreeNode((Node *) stmt);
/* this takes care of cases where not all workers have synced metadata */
RenameStmt *stmtCopy = copyObject(stmt);
stmtCopy->missing_ok = true;
const char *sql = DeparseTreeNode((Node *) stmtCopy);
List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* RenameSequenceStmtObjectAddress returns the ObjectAddress of the sequence that is the
* subject of the RenameStmt.
*/
ObjectAddress
RenameSequenceStmtObjectAddress(Node *node, bool missing_ok)
{
RenameStmt *stmt = castNode(RenameStmt, node);
Assert(stmt->renameType == OBJECT_SEQUENCE);
RangeVar *sequence = stmt->relation;
Oid seqOid = RangeVarGetRelid(sequence, NoLock, missing_ok);
ObjectAddress sequenceAddress = { 0 };
ObjectAddressSet(sequenceAddress, RelationRelationId, seqOid);
return sequenceAddress;
}
/*
* ShouldPropagateAlterSequence returns, based on the address of a sequence, if alter
* statements targeting the function should be propagated.
*/
static bool
ShouldPropagateAlterSequence(const ObjectAddress *address)
{
if (creating_extension)
{
/*
* extensions should be created separately on the workers, sequences cascading
* from an extension should therefore not be propagated.
*/
return false;
}
if (!EnableDependencyCreation)
{
/*
* we are configured to disable object propagation, should not propagate anything
*/
return false;
}
if (!IsObjectDistributed(address))
{
/* do not propagate alter sequence for non-distributed sequences */
return false;
}
return true;
}
/*
* PreprocessAlterSequenceStmt gets called during the planning phase of an ALTER SEQUENCE statement
* of one of the following forms:
* ALTER SEQUENCE [ IF EXISTS ] name
* [ AS data_type ]
* [ INCREMENT [ BY ] increment ]
* [ MINVALUE minvalue | NO MINVALUE ] [ MAXVALUE maxvalue | NO MAXVALUE ]
* [ START [ WITH ] start ]
* [ RESTART [ [ WITH ] restart ] ]
* [ CACHE cache ] [ [ NO ] CYCLE ]
* [ OWNED BY { table_name.column_name | NONE } ]
*
* For distributed sequences, this operation will not be allowed for now.
* The reason is that we change sequence parameters when distributing it, so we don't want to
* touch those parameters for now.
*/
List *
PreprocessAlterSequenceStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterSeqStmt *stmt = castNode(AlterSeqStmt, node);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt,
stmt->missing_ok);
/* error out if the sequence is distributed */
if (IsObjectDistributed(&address))
{
ereport(ERROR, (errmsg(
"This operation is currently not allowed for a distributed sequence.")));
}
else
{
return NIL;
}
}
/*
* AlterSequenceOwnerObjectAddress returns the ObjectAddress of the sequence that is the
* subject of the AlterOwnerStmt.
*/
ObjectAddress
AlterSequenceObjectAddress(Node *node, bool missing_ok)
{
AlterSeqStmt *stmt = castNode(AlterSeqStmt, node);
RangeVar *sequence = stmt->sequence;
Oid seqOid = RangeVarGetRelid(sequence, NoLock, stmt->missing_ok);
ObjectAddress sequenceAddress = { 0 };
ObjectAddressSet(sequenceAddress, RelationRelationId, seqOid);
return sequenceAddress;
}
/*
* PreprocessAlterSequenceSchemaStmt is executed before the statement is applied to the local
* postgres instance.
*
* For distributed sequences, this operation will not be allowed for now.
*/
List *
PreprocessAlterSequenceSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_SEQUENCE);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt,
stmt->missing_ok);
/* error out if the sequence is distributed */
if (IsObjectDistributed(&address))
{
ereport(ERROR, (errmsg(
"This operation is currently not allowed for a distributed sequence.")));
}
else
{
return NIL;
}
}
/*
* AlterSequenceSchemaStmtObjectAddress returns the ObjectAddress of the sequence that is
* the subject of the AlterObjectSchemaStmt.
*/
ObjectAddress
AlterSequenceSchemaStmtObjectAddress(Node *node, bool missing_ok)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_SEQUENCE);
RangeVar *sequence = stmt->relation;
Oid seqOid = RangeVarGetRelid(sequence, NoLock, missing_ok);
ObjectAddress sequenceAddress = { 0 };
ObjectAddressSet(sequenceAddress, RelationRelationId, seqOid);
return sequenceAddress;
}

View File

@ -17,6 +17,7 @@
#include "catalog/pg_class.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_depend.h"
#include "catalog/pg_type.h"
#include "commands/tablecmds.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
@ -28,6 +29,7 @@
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata/dependency.h"
#include "distributed/metadata/distobject.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/reference_table_utils.h"
@ -36,6 +38,7 @@
#include "distributed/version_compat.h"
#include "lib/stringinfo.h"
#include "nodes/parsenodes.h"
#include "parser/parse_expr.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
@ -165,6 +168,8 @@ PreprocessDropTableStmt(Node *node, const char *queryString,
SendCommandToWorkersWithMetadata(detachPartitionCommand);
}
SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION);
}
return NIL;
@ -574,24 +579,50 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
ErrorIfUnsupportedAlterTableStmt(alterTableStatement);
}
EnsureCoordinator();
/* these will be set in below loop according to subcommands */
Oid rightRelationId = InvalidOid;
bool executeSequentially = false;
/*
* We check if there is a ADD/DROP FOREIGN CONSTRAINT command in sub commands
* list. If there is we assign referenced relation id to rightRelationId and
* we also set skip_validation to true to prevent PostgreSQL to verify validity
* of the foreign constraint in master. Validity will be checked in workers
* anyway.
* We check if there is:
* - an ADD/DROP FOREIGN CONSTRAINT command in sub commands
* list. If there is we assign referenced relation id to rightRelationId and
* we also set skip_validation to true to prevent PostgreSQL to verify validity
* of the foreign constraint in master. Validity will be checked in workers
* anyway.
* - an ADD COLUMN .. DEFAULT nextval('..') OR
* an ADD COLUMN .. SERIAL pseudo-type OR
* an ALTER COLUMN .. SET DEFAULT nextval('..'). If there is we set
* deparseAT variable to true which means we will deparse the statement
* before we propagate the command to shards. For shards, all the defaults
* coming from a user-defined sequence will be replaced by
* NOT NULL constraint.
*/
List *commandList = alterTableStatement->cmds;
/*
* if deparsing is needed, we will use a different version of the original
* alterTableStmt
*/
bool deparseAT = false;
bool propagateCommandToWorkers = true;
AlterTableStmt *newStmt = copyObject(alterTableStatement);
AlterTableCmd *newCmd = makeNode(AlterTableCmd);
AlterTableCmd *command = NULL;
foreach_ptr(command, commandList)
{
AlterTableType alterTableType = command->subtype;
/*
* if deparsing is needed, we will use a different version of the original
* AlterTableCmd
*/
newCmd = copyObject(command);
if (alterTableType == AT_AddConstraint)
{
Constraint *constraint = (Constraint *) command->def;
@ -666,6 +697,96 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
break;
}
}
/*
* We check for ADD COLUMN .. DEFAULT expr
* if expr contains nextval('user_defined_seq')
* we should deparse the statement
*/
constraint = NULL;
foreach_ptr(constraint, columnConstraints)
{
if (constraint->contype == CONSTR_DEFAULT)
{
if (constraint->raw_expr != NULL)
{
ParseState *pstate = make_parsestate(NULL);
Node *expr = transformExpr(pstate, constraint->raw_expr,
EXPR_KIND_COLUMN_DEFAULT);
if (contain_nextval_expression_walker(expr, NULL))
{
deparseAT = true;
/* the new column definition will have no constraint */
ColumnDef *newColDef = copyObject(columnDefinition);
newColDef->constraints = NULL;
newCmd->def = (Node *) newColDef;
}
}
}
}
/*
* We check for ADD COLUMN .. SERIAL pseudo-type
* if that's the case, we should deparse the statement
* The structure of this check is copied from transformColumnDefinition.
*/
if (columnDefinition->typeName && list_length(
columnDefinition->typeName->names) == 1 &&
!columnDefinition->typeName->pct_type)
{
char *typeName = strVal(linitial(columnDefinition->typeName->names));
if (strcmp(typeName, "smallserial") == 0 ||
strcmp(typeName, "serial2") == 0 ||
strcmp(typeName, "serial") == 0 ||
strcmp(typeName, "serial4") == 0 ||
strcmp(typeName, "bigserial") == 0 ||
strcmp(typeName, "serial8") == 0)
{
deparseAT = true;
ColumnDef *newColDef = copyObject(columnDefinition);
newColDef->is_not_null = false;
if (strcmp(typeName, "smallserial") == 0 ||
strcmp(typeName, "serial2") == 0)
{
newColDef->typeName->names = NIL;
newColDef->typeName->typeOid = INT2OID;
}
else if (strcmp(typeName, "serial") == 0 ||
strcmp(typeName, "serial4") == 0)
{
newColDef->typeName->names = NIL;
newColDef->typeName->typeOid = INT4OID;
}
else if (strcmp(typeName, "bigserial") == 0 ||
strcmp(typeName, "serial8") == 0)
{
newColDef->typeName->names = NIL;
newColDef->typeName->typeOid = INT8OID;
}
newCmd->def = (Node *) newColDef;
}
}
}
/*
* We check for ALTER COLUMN .. SET/DROP DEFAULT
* we should not propagate anything to shards
*/
else if (alterTableType == AT_ColumnDefault)
{
ParseState *pstate = make_parsestate(NULL);
Node *expr = transformExpr(pstate, command->def,
EXPR_KIND_COLUMN_DEFAULT);
if (contain_nextval_expression_walker(expr, NULL))
{
propagateCommandToWorkers = false;
}
}
else if (alterTableType == AT_AttachPartition)
{
@ -731,12 +852,20 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = leftRelationId;
ddlJob->concurrentIndexCmd = false;
const char *sqlForTaskList = alterTableCommand;
if (deparseAT)
{
newStmt->cmds = list_make1(newCmd);
sqlForTaskList = DeparseTreeNode((Node *) newStmt);
}
ddlJob->commandString = alterTableCommand;
if (OidIsValid(rightRelationId))
{
bool referencedIsLocalTable = !IsCitusTable(rightRelationId);
if (referencedIsLocalTable)
if (referencedIsLocalTable || !propagateCommandToWorkers)
{
ddlJob->taskList = NIL;
}
@ -744,13 +873,17 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
{
/* if foreign key related, use specialized task list function ... */
ddlJob->taskList = InterShardDDLTaskList(leftRelationId, rightRelationId,
alterTableCommand);
sqlForTaskList);
}
}
else
{
/* ... otherwise use standard DDL task list function */
ddlJob->taskList = DDLTaskList(leftRelationId, alterTableCommand);
ddlJob->taskList = DDLTaskList(leftRelationId, sqlForTaskList);
if (!propagateCommandToWorkers)
{
ddlJob->taskList = NIL;
}
}
List *ddlJobs = list_make1(ddlJob);
@ -1467,7 +1600,79 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
constraint);
}
}
/*
* We check for ADD COLUMN .. DEFAULT expr
* if expr contains nextval('user_defined_seq')
* we should make sure that the type of the column that uses
* that sequence is supported
*/
constraint = NULL;
foreach_ptr(constraint, columnConstraints)
{
if (constraint->contype == CONSTR_DEFAULT)
{
if (constraint->raw_expr != NULL)
{
ParseState *pstate = make_parsestate(NULL);
Node *expr = transformExpr(pstate, constraint->raw_expr,
EXPR_KIND_COLUMN_DEFAULT);
/*
* We should make sure that the type of the column that uses
* that sequence is supported
*/
if (contain_nextval_expression_walker(expr, NULL))
{
AttrNumber attnum = get_attnum(relationId,
columnDefinition->colname);
Oid seqTypId = GetAttributeTypeOid(relationId, attnum);
EnsureSequenceTypeSupported(relationId, attnum, seqTypId);
}
}
}
}
}
/*
* We check for ALTER COLUMN .. SET DEFAULT nextval('user_defined_seq')
* we should make sure that the type of the column that uses
* that sequence is supported
*/
else if (alterTableType == AT_ColumnDefault)
{
ParseState *pstate = make_parsestate(NULL);
Node *expr = transformExpr(pstate, command->def,
EXPR_KIND_COLUMN_DEFAULT);
if (contain_nextval_expression_walker(expr, NULL))
{
AttrNumber attnum = get_attnum(relationId, command->name);
Oid seqTypId = GetAttributeTypeOid(relationId, attnum);
EnsureSequenceTypeSupported(relationId, attnum, seqTypId);
}
}
}
/* for the new sequences coming with this ALTER TABLE statement */
if (ShouldSyncTableMetadata(relationId) && ClusterHasKnownMetadataWorkers())
{
List *sequenceCommandList = NIL;
/* commands to create sequences */
List *sequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
sequenceCommandList = list_concat(sequenceCommandList, sequenceDDLCommands);
/* prevent recursive propagation */
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
/* send the commands one by one */
const char *sequenceCommand = NULL;
foreach_ptr(sequenceCommand, sequenceCommandList)
{
SendCommandToWorkersWithMetadata(sequenceCommand);
}
SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION);
}
}
@ -1736,9 +1941,100 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
strcmp(typeName, "bigserial") == 0 ||
strcmp(typeName, "serial8") == 0)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot execute ADD COLUMN commands "
"involving serial pseudotypes")));
/*
* We currently don't support adding a serial column for an MX table
* TODO: record the dependency in the workers
*/
if (ShouldSyncTableMetadata(relationId) &&
ClusterHasKnownMetadataWorkers())
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"cannot execute ADD COLUMN commands involving serial"
" pseudotypes when metadata is synchronized to workers")));
}
/*
* we only allow adding a serial column if it is the only subcommand
* and it has no constraints
*/
if (commandList->length > 1 || column->constraints)
{
ereport(ERROR, (errcode(
ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"cannot execute ADD COLUMN commands involving "
"serial pseudotypes with other subcommands/constraints"),
errhint(
"You can issue each subcommand separately")));
}
/*
* Currently we don't support backfilling the new column with default values
* if the table is not empty
*/
if (!TableEmpty(relationId))
{
ereport(ERROR, (errcode(
ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"Cannot add a column involving serial pseudotypes "
"because the table is not empty"),
errhint(
"You can first call ALTER TABLE .. ADD COLUMN .. smallint/int/bigint\n"
"Then set the default by ALTER TABLE .. ALTER COLUMN .. SET DEFAULT nextval('..')")));
}
}
}
List *columnConstraints = column->constraints;
Constraint *constraint = NULL;
foreach_ptr(constraint, columnConstraints)
{
if (constraint->contype == CONSTR_DEFAULT)
{
if (constraint->raw_expr != NULL)
{
ParseState *pstate = make_parsestate(NULL);
Node *expr = transformExpr(pstate, constraint->raw_expr,
EXPR_KIND_COLUMN_DEFAULT);
if (contain_nextval_expression_walker(expr, NULL))
{
/*
* we only allow adding a column with non_const default
* if its the only subcommand and has no other constraints
*/
if (commandList->length > 1 ||
columnConstraints->length > 1)
{
ereport(ERROR, (errcode(
ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"cannot execute ADD COLUMN .. DEFAULT nextval('..')"
" command with other subcommands/constraints"),
errhint(
"You can issue each subcommand separately")));
}
/*
* Currently we don't support backfilling the new column with default values
* if the table is not empty
*/
if (!TableEmpty(relationId))
{
ereport(ERROR, (errcode(
ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"cannot add a column involving DEFAULT nextval('..') "
"because the table is not empty"),
errhint(
"You can first call ALTER TABLE .. ADD COLUMN .. smallint/int/bigint\n"
"Then set the default by ALTER TABLE .. ALTER COLUMN .. SET DEFAULT nextval('..')")));
}
}
}
}
}
}
@ -1746,9 +2042,67 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
break;
}
case AT_DropColumn:
case AT_ColumnDefault:
{
if (AlterInvolvesPartitionColumn(alterTableStatement, command))
{
ereport(ERROR, (errmsg("cannot execute ALTER TABLE command "
"involving partition column")));
}
ParseState *pstate = make_parsestate(NULL);
Node *expr = transformExpr(pstate, command->def,
EXPR_KIND_COLUMN_DEFAULT);
if (contain_nextval_expression_walker(expr, NULL))
{
/*
* we only allow altering a column's default to non_const expr
* if its the only subcommand
*/
if (commandList->length > 1)
{
ereport(ERROR, (errcode(
ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"cannot execute ALTER COLUMN COLUMN .. SET DEFAULT "
"nextval('..') command with other subcommands"),
errhint(
"You can issue each subcommand separately")));
}
}
break;
}
case AT_AlterColumnType:
{
if (AlterInvolvesPartitionColumn(alterTableStatement, command))
{
ereport(ERROR, (errmsg("cannot execute ALTER TABLE command "
"involving partition column")));
}
/*
* We check for ALTER COLUMN TYPE ...
* if the column has default coming from a user-defined sequence
* changing the type of the column should not be allowed for now
*/
AttrNumber attnum = get_attnum(relationId, command->name);
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(relationId, &attnumList,
&dependentSequenceList, attnum);
if (dependentSequenceList != NIL)
{
ereport(ERROR, (errmsg("cannot execute ALTER COLUMN TYPE .. command "
"because the column involves a default coming "
"from a sequence")));
}
break;
}
case AT_DropColumn:
case AT_DropNotNull:
{
if (AlterInvolvesPartitionColumn(alterTableStatement, command))

View File

@ -27,10 +27,12 @@
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_am.h"
#include "catalog/pg_attrdef.h"
#include "catalog/pg_attribute.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_class.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_depend.h"
#include "catalog/pg_extension.h"
#include "catalog/pg_foreign_data_wrapper.h"
#include "catalog/pg_index.h"
@ -40,9 +42,10 @@
#include "distributed/citus_ruleutils.h"
#include "distributed/listutils.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/relay_utility.h"
#include "distributed/metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata_utility.h"
#include "distributed/relay_utility.h"
#include "distributed/version_compat.h"
#include "foreign/foreign.h"
#include "lib/stringinfo.h"
@ -74,6 +77,8 @@ static void AppendStorageParametersToString(StringInfo stringBuffer,
List *optionList);
static void simple_quote_literal(StringInfo buf, const char *val);
static char * flatten_reloptions(Oid relid);
static Oid get_attrdef_oid(Oid relationId, AttrNumber attnum);
/*
* pg_get_extensiondef_string finds the foreign data wrapper that corresponds to
@ -365,6 +370,16 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults,
appendStringInfo(&buffer, " DEFAULT %s", defaultString);
}
}
/*
* We should make sure that the type of the column that uses
* that sequence is supported
*/
if (contain_nextval_expression_walker(defaultNode, NULL))
{
EnsureSequenceTypeSupported(tableRelationId, defaultValue->adnum,
attributeForm->atttypid);
}
}
/* if this column has a not null constraint, append the constraint */
@ -483,6 +498,126 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults,
}
/*
* EnsureSequenceTypeSupported ensures that the type of the column that uses
* a sequence on its DEFAULT is consistent with previous uses of the sequence (if any)
* It gets the AttrDefault OID from the given relationId and attnum, extracts the sequence
* id from it, and if any other distributed table uses that same sequence, it checks whether
* the types of the columns using the sequence match. If they don't, it errors out.
* Otherwise, the condition is ensured.
*/
void
EnsureSequenceTypeSupported(Oid relationId, AttrNumber attnum, Oid seqTypId)
{
/* get attrdefoid from the given relationId and attnum */
Oid attrdefOid = get_attrdef_oid(relationId, attnum);
/* retrieve the sequence id of the sequence found in nextval('seq') */
List *sequencesFromAttrDef = GetSequencesFromAttrDef(attrdefOid);
/* to simplify and eliminate cases like "DEFAULT nextval('..') - nextval('..')" */
if (list_length(sequencesFromAttrDef) > 1)
{
ereport(ERROR, (errmsg(
"More than one sequence in a column default"
" is not supported for distribution")));
}
Oid seqOid = lfirst_oid(list_head(sequencesFromAttrDef));
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
Oid citusTableId = InvalidOid;
foreach_oid(citusTableId, citusTableIdList)
{
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(citusTableId, &attnumList,
&dependentSequenceList, 0);
ListCell *attnumCell = NULL;
ListCell *dependentSequenceCell = NULL;
forboth(attnumCell, attnumList, dependentSequenceCell,
dependentSequenceList)
{
AttrNumber currentAttnum = lfirst_int(attnumCell);
Oid currentSeqOid = lfirst_oid(dependentSequenceCell);
/*
* If another distributed table is using the same sequence
* in one of its column defaults, make sure the types of the
* columns match
*/
if (currentSeqOid == seqOid)
{
Oid currentSeqTypId = GetAttributeTypeOid(citusTableId,
currentAttnum);
if (seqTypId != currentSeqTypId)
{
char *sequenceName = generate_qualified_relation_name(
seqOid);
char *citusTableName =
generate_qualified_relation_name(citusTableId);
ereport(ERROR, (errmsg(
"The sequence %s is already used for a different"
" type in column %d of the table %s",
sequenceName, currentAttnum,
citusTableName)));
}
}
}
}
}
/*
* get_attrdef_oid gets the oid of the attrdef that has dependency with
* the given relationId (refobjid) and attnum (refobjsubid).
* If there is no such attrdef it returns InvalidOid.
* NOTE: we are iterating pg_depend here since this function is used together
* with other functions that iterate pg_depend. Normally, a look at pg_attrdef
* would make more sense.
*/
static Oid
get_attrdef_oid(Oid relationId, AttrNumber attnum)
{
Oid resultAttrdefOid = InvalidOid;
ScanKeyData key[3];
Relation depRel = table_open(DependRelationId, AccessShareLock);
ScanKeyInit(&key[0],
Anum_pg_depend_refclassid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationRelationId));
ScanKeyInit(&key[1],
Anum_pg_depend_refobjid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(relationId));
ScanKeyInit(&key[2],
Anum_pg_depend_refobjsubid,
BTEqualStrategyNumber, F_INT4EQ,
Int32GetDatum(attnum));
SysScanDesc scan = systable_beginscan(depRel, DependReferenceIndexId, true,
NULL, attnum ? 3 : 2, key);
HeapTuple tup;
while (HeapTupleIsValid(tup = systable_getnext(scan)))
{
Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup);
if (deprec->classid == AttrDefaultRelationId)
{
resultAttrdefOid = deprec->objid;
}
}
systable_endscan(scan);
table_close(depRel, AccessShareLock);
return resultAttrdefOid;
}
/*
* EnsureRelationKindSupported errors out if the given relation is not supported
* as a distributed relation.

View File

@ -0,0 +1,158 @@
/*-------------------------------------------------------------------------
*
* deparse_sequence_stmts.c
*
* All routines to deparse sequence statements.
* This file contains all entry points specific for sequence statement
* deparsing
*
* Copyright (c), Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/namespace.h"
#include "distributed/deparser.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
/* forward declaration for deparse functions */
static void AppendDropSequenceStmt(StringInfo buf, DropStmt *stmt);
static void AppendSequenceNameList(StringInfo buf, List *objects, ObjectType objtype);
static void AppendRenameSequenceStmt(StringInfo buf, RenameStmt *stmt);
/*
* DeparseDropSequenceStmt builds and returns a string representing the DropStmt
*/
char *
DeparseDropSequenceStmt(Node *node)
{
DropStmt *stmt = castNode(DropStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(stmt->removeType == OBJECT_SEQUENCE);
AppendDropSequenceStmt(&str, stmt);
return str.data;
}
/*
* AppendDropSequenceStmt appends a string representing the DropStmt to a buffer
*/
static void
AppendDropSequenceStmt(StringInfo buf, DropStmt *stmt)
{
appendStringInfoString(buf, "DROP SEQUENCE ");
if (stmt->missing_ok)
{
appendStringInfoString(buf, "IF EXISTS ");
}
AppendSequenceNameList(buf, stmt->objects, stmt->removeType);
if (stmt->behavior == DROP_CASCADE)
{
appendStringInfoString(buf, " CASCADE");
}
appendStringInfoString(buf, ";");
}
/*
* AppendSequenceNameList appends a string representing the list of sequence names to a buffer
*/
static void
AppendSequenceNameList(StringInfo buf, List *objects, ObjectType objtype)
{
ListCell *objectCell = NULL;
foreach(objectCell, objects)
{
if (objectCell != list_head(objects))
{
appendStringInfo(buf, ", ");
}
RangeVar *seq = makeRangeVarFromNameList((List *) lfirst(objectCell));
if (seq->schemaname == NULL)
{
Oid schemaOid = RangeVarGetCreationNamespace(seq);
seq->schemaname = get_namespace_name(schemaOid);
}
char *qualifiedSequenceName = quote_qualified_identifier(seq->schemaname,
seq->relname);
appendStringInfoString(buf, qualifiedSequenceName);
}
}
/*
* DeparseRenameSequenceStmt builds and returns a string representing the RenameStmt
*/
char *
DeparseRenameSequenceStmt(Node *node)
{
RenameStmt *stmt = castNode(RenameStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(stmt->renameType == OBJECT_SEQUENCE);
AppendRenameSequenceStmt(&str, stmt);
return str.data;
}
/*
* AppendRenameSequenceStmt appends a string representing the RenameStmt to a buffer
*/
static void
AppendRenameSequenceStmt(StringInfo buf, RenameStmt *stmt)
{
RangeVar *seq = stmt->relation;
char *qualifiedSequenceName = quote_qualified_identifier(seq->schemaname,
seq->relname);
appendStringInfoString(buf, "ALTER SEQUENCE ");
if (stmt->missing_ok)
{
appendStringInfoString(buf, "IF EXISTS ");
}
appendStringInfoString(buf, qualifiedSequenceName);
appendStringInfo(buf, " RENAME TO %s", quote_identifier(stmt->newname));
}
/*
* QualifyRenameSequenceStmt transforms a
* ALTER SEQUENCE .. RENAME TO ..
* statement in place and makes the sequence name fully qualified.
*/
void
QualifyRenameSequenceStmt(Node *node)
{
RenameStmt *stmt = castNode(RenameStmt, node);
Assert(stmt->renameType == OBJECT_SEQUENCE);
RangeVar *seq = stmt->relation;
if (seq->schemaname == NULL)
{
Oid schemaOid = RangeVarGetCreationNamespace(seq);
seq->schemaname = get_namespace_name(schemaOid);
}
}

View File

@ -14,9 +14,13 @@
#include "distributed/deparser.h"
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
#include "parser/parse_type.h"
#include "utils/builtins.h"
static void AppendAlterTableSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt);
static void AppendAlterTableStmt(StringInfo buf, AlterTableStmt *stmt);
static void AppendAlterTableCmd(StringInfo buf, AlterTableCmd *alterTableCmd);
static void AppendAlterTableCmdAddColumn(StringInfo buf, AlterTableCmd *alterTableCmd);
char *
DeparseAlterTableSchemaStmt(Node *node)
@ -46,3 +50,129 @@ AppendAlterTableSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt)
const char *newSchemaName = quote_identifier(stmt->newschema);
appendStringInfo(buf, "%s SET SCHEMA %s;", tableName, newSchemaName);
}
/*
* DeparseAlterTableStmt builds and returns a string representing the
* AlterTableStmt where the object acted upon is of kind OBJECT_TABLE
*/
char *
DeparseAlterTableStmt(Node *node)
{
AlterTableStmt *stmt = castNode(AlterTableStmt, node);
StringInfoData str = { 0 };
initStringInfo(&str);
Assert(stmt->relkind == OBJECT_TABLE);
AppendAlterTableStmt(&str, stmt);
return str.data;
}
/*
* AppendAlterTableStmt builds and returns an SQL command representing an
* ALTER TABLE statement from given AlterTableStmt object where the object
* acted upon is of kind OBJECT_TABLE
*/
static void
AppendAlterTableStmt(StringInfo buf, AlterTableStmt *stmt)
{
const char *identifier = quote_qualified_identifier(stmt->relation->schemaname,
stmt->relation->relname);
ListCell *cmdCell = NULL;
Assert(stmt->relkind == OBJECT_TABLE);
appendStringInfo(buf, "ALTER TABLE %s", identifier);
foreach(cmdCell, stmt->cmds)
{
if (cmdCell != list_head(stmt->cmds))
{
appendStringInfoString(buf, ", ");
}
AlterTableCmd *alterTableCmd = castNode(AlterTableCmd, lfirst(cmdCell));
AppendAlterTableCmd(buf, alterTableCmd);
}
appendStringInfoString(buf, ";");
}
/*
* AppendAlterTableCmd builds and appends to the given buffer a command
* from given AlterTableCmd object. Currently supported commands are of type
* AT_AddColumn and AT_SetNotNull
*/
static void
AppendAlterTableCmd(StringInfo buf, AlterTableCmd *alterTableCmd)
{
switch (alterTableCmd->subtype)
{
case AT_AddColumn:
{
AppendAlterTableCmdAddColumn(buf, alterTableCmd);
break;
}
default:
{
ereport(ERROR, (errmsg("unsupported subtype for alter table command"),
errdetail("sub command type: %d", alterTableCmd->subtype)));
}
}
}
/*
* AppendAlterTableCmd builds and appends to the given buffer an AT_AddColumn command
* from given AlterTableCmd object in the form ADD COLUMN ...
*/
static void
AppendAlterTableCmdAddColumn(StringInfo buf, AlterTableCmd *alterTableCmd)
{
Assert(alterTableCmd->subtype == AT_AddColumn);
appendStringInfoString(buf, " ADD COLUMN ");
ColumnDef *columnDefinition = (ColumnDef *) alterTableCmd->def;
/*
* the way we use the deparser now, constraints are always NULL
* adding this check for ColumnDef consistency
*/
if (columnDefinition->constraints != NULL)
{
ereport(ERROR, (errmsg("Constraints are not supported for AT_AddColumn")));
}
if (columnDefinition->colname)
{
appendStringInfo(buf, "%s ", quote_identifier(columnDefinition->colname));
}
int32 typmod = 0;
Oid typeOid = InvalidOid;
bits16 formatFlags = FORMAT_TYPE_TYPEMOD_GIVEN | FORMAT_TYPE_FORCE_QUALIFY;
typenameTypeIdAndMod(NULL, columnDefinition->typeName, &typeOid, &typmod);
appendStringInfo(buf, "%s", format_type_extended(typeOid, typmod,
formatFlags));
if (columnDefinition->is_not_null)
{
appendStringInfoString(buf, " NOT NULL");
}
/*
* the way we use the deparser now, collation is never used
* since the data type of columns that use sequences for default
* are only int,smallint and bigint (never text, varchar, char)
* Adding this part only for ColumnDef consistency
*/
Oid collationOid = GetColumnDefCollation(NULL, columnDefinition, typeOid);
if (OidIsValid(collationOid))
{
const char *identifier = FormatCollateBEQualified(collationOid);
appendStringInfo(buf, " COLLATE %s", identifier);
}
}

View File

@ -3344,8 +3344,7 @@ GetLocalGroupId(void)
return LocalGroupId;
}
Oid localGroupTableOid = get_relname_relid("pg_dist_local_group",
PG_CATALOG_NAMESPACE);
Oid localGroupTableOid = DistLocalGroupIdRelationId();
if (localGroupTableOid == InvalidOid)
{
return 0;

View File

@ -25,11 +25,13 @@
#include "access/xact.h"
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/pg_attrdef.h"
#include "catalog/pg_depend.h"
#include "catalog/pg_foreign_server.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_type.h"
#include "commands/async.h"
#include "commands/sequence.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/commands.h"
#include "distributed/deparser.h"
@ -46,10 +48,13 @@
#include "distributed/pg_dist_node.h"
#include "distributed/remote_commands.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
#include "distributed/version_compat.h"
#include "executor/spi.h"
#include "foreign/foreign.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/pg_list.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
@ -67,7 +72,6 @@ static List * GetDistributedTableDDLEvents(Oid relationId);
static char * LocalGroupIdUpdateCommand(int32 groupId);
static void UpdateDistNodeBoolAttr(const char *nodeName, int32 nodePort,
int attrNum, bool value);
static List * SequenceDDLCommandsForTable(Oid relationId);
static List * SequenceDependencyCommandList(Oid relationId);
static char * TruncateTriggerCreateCommand(Oid relationId);
static char * SchemaOwnerName(Oid objectId);
@ -374,11 +378,9 @@ MetadataCreateCommands(void)
continue;
}
List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
List *ddlCommandList = GetFullTableCreationCommands(relationId,
includeSequenceDefaults);
char *tableOwnerResetCommand = TableOwnerResetCommand(relationId);
List *sequenceDependencyCommandList = SequenceDependencyCommandList(relationId);
/*
* Tables might have dependencies on different objects, since we create shards for
@ -388,6 +390,7 @@ MetadataCreateCommands(void)
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
EnsureDependenciesExistOnAllNodes(&tableAddress);
List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
workerSequenceDDLCommands);
@ -402,6 +405,9 @@ MetadataCreateCommands(void)
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
tableOwnerResetCommand);
List *sequenceDependencyCommandList = SequenceDependencyCommandList(
relationId);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
sequenceDependencyCommandList);
}
@ -509,7 +515,8 @@ GetDistributedTableDDLEvents(Oid relationId)
}
/* command to associate sequences with table */
List *sequenceDependencyCommandList = SequenceDependencyCommandList(relationId);
List *sequenceDependencyCommandList = SequenceDependencyCommandList(
relationId);
commandList = list_concat(commandList, sequenceDependencyCommandList);
}
@ -1042,21 +1049,58 @@ List *
SequenceDDLCommandsForTable(Oid relationId)
{
List *sequenceDDLList = NIL;
List *ownedSequences = GetSequencesOwnedByRelation(relationId);
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(relationId, &attnumList, &dependentSequenceList, 0);
char *ownerName = TableOwner(relationId);
Oid sequenceOid = InvalidOid;
foreach_oid(sequenceOid, ownedSequences)
ListCell *attnumCell = NULL;
ListCell *dependentSequenceCell = NULL;
forboth(attnumCell, attnumList, dependentSequenceCell, dependentSequenceList)
{
AttrNumber attnum = lfirst_int(attnumCell);
Oid sequenceOid = lfirst_oid(dependentSequenceCell);
char *sequenceDef = pg_get_sequencedef_string(sequenceOid);
char *escapedSequenceDef = quote_literal_cstr(sequenceDef);
StringInfo wrappedSequenceDef = makeStringInfo();
StringInfo sequenceGrantStmt = makeStringInfo();
char *sequenceName = generate_qualified_relation_name(sequenceOid);
Form_pg_sequence sequenceData = pg_get_sequencedef(sequenceOid);
Oid sequenceTypeOid = sequenceData->seqtypid;
Oid sequenceTypeOid = GetAttributeTypeOid(relationId, attnum);
char *typeName = format_type_be(sequenceTypeOid);
/* get sequence address */
ObjectAddress sequenceAddress = { 0 };
ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid);
EnsureDependenciesExistOnAllNodes(&sequenceAddress);
/*
* Alter the sequence's data type in the coordinator if needed.
* A sequence's type is bigint by default and it doesn't change even if
* it's used in an int column. However, when distributing the sequence,
* we don't allow incompatible min/max ranges between the coordinator and
* workers, so we determine the sequence type here based on its current usage
* and propagate that same type to the workers as well.
* TODO: move this command to the part where the sequence is
* used in a distributed table: both in create_distributed_table
* and ALTER TABLE commands that include a sequence default
*/
Oid currentSequenceTypeOid = sequenceData->seqtypid;
if (currentSequenceTypeOid != sequenceTypeOid)
{
AlterSeqStmt *alterSequenceStatement = makeNode(AlterSeqStmt);
char *seqNamespace = get_namespace_name(get_rel_namespace(sequenceOid));
char *seqName = get_rel_name(sequenceOid);
alterSequenceStatement->sequence = makeRangeVar(seqNamespace, seqName, -1);
Node *asTypeNode = (Node *) makeTypeNameFromOid(sequenceTypeOid, -1);
SetDefElemArg(alterSequenceStatement, "as", asTypeNode);
ParseState *pstate = make_parsestate(NULL);
AlterSequence(pstate, alterSequenceStatement);
}
/* create schema if needed */
appendStringInfo(wrappedSequenceDef,
WORKER_APPLY_SEQUENCE_COMMAND,
@ -1069,12 +1113,184 @@ SequenceDDLCommandsForTable(Oid relationId)
sequenceDDLList = lappend(sequenceDDLList, wrappedSequenceDef->data);
sequenceDDLList = lappend(sequenceDDLList, sequenceGrantStmt->data);
MarkObjectDistributed(&sequenceAddress);
}
return sequenceDDLList;
}
/*
* GetAttributeTypeOid returns the OID of the type of the attribute of
* provided relationId that has the provided attnum
*/
Oid
GetAttributeTypeOid(Oid relationId, AttrNumber attnum)
{
Oid resultOid = InvalidOid;
ScanKeyData key[2];
/* Grab an appropriate lock on the pg_attribute relation */
Relation attrel = table_open(AttributeRelationId, AccessShareLock);
/* Use the index to scan only system attributes of the target relation */
ScanKeyInit(&key[0],
Anum_pg_attribute_attrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(relationId));
ScanKeyInit(&key[1],
Anum_pg_attribute_attnum,
BTLessEqualStrategyNumber, F_INT2LE,
Int16GetDatum(attnum));
SysScanDesc scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true, NULL, 2,
key);
HeapTuple attributeTuple;
while (HeapTupleIsValid(attributeTuple = systable_getnext(scan)))
{
Form_pg_attribute att = (Form_pg_attribute) GETSTRUCT(attributeTuple);
resultOid = att->atttypid;
}
systable_endscan(scan);
table_close(attrel, AccessShareLock);
return resultOid;
}
/*
* GetDependentSequencesWithRelation appends the attnum and id of sequences that
* have direct (owned sequences) or indirect dependency with the given relationId,
* to the lists passed as NIL initially.
* For both cases, we use the intermediate AttrDefault object from pg_depend.
* If attnum is specified, we only return the sequences related to that
* attribute of the relationId.
*/
void
GetDependentSequencesWithRelation(Oid relationId, List **attnumList,
List **dependentSequenceList, AttrNumber attnum)
{
Assert(*attnumList == NIL && *dependentSequenceList == NIL);
List *attrdefResult = NIL;
List *attrdefAttnumResult = NIL;
ScanKeyData key[3];
HeapTuple tup;
Relation depRel = table_open(DependRelationId, AccessShareLock);
ScanKeyInit(&key[0],
Anum_pg_depend_refclassid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationRelationId));
ScanKeyInit(&key[1],
Anum_pg_depend_refobjid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(relationId));
if (attnum)
{
ScanKeyInit(&key[2],
Anum_pg_depend_refobjsubid,
BTEqualStrategyNumber, F_INT4EQ,
Int32GetDatum(attnum));
}
SysScanDesc scan = systable_beginscan(depRel, DependReferenceIndexId, true,
NULL, attnum ? 3 : 2, key);
while (HeapTupleIsValid(tup = systable_getnext(scan)))
{
Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup);
if (deprec->classid == AttrDefaultRelationId &&
deprec->objsubid == 0 &&
deprec->refobjsubid != 0 &&
deprec->deptype == DEPENDENCY_AUTO)
{
attrdefResult = lappend_oid(attrdefResult, deprec->objid);
attrdefAttnumResult = lappend_int(attrdefAttnumResult, deprec->refobjsubid);
}
}
systable_endscan(scan);
table_close(depRel, AccessShareLock);
ListCell *attrdefOidCell = NULL;
ListCell *attrdefAttnumCell = NULL;
forboth(attrdefOidCell, attrdefResult, attrdefAttnumCell, attrdefAttnumResult)
{
Oid attrdefOid = lfirst_oid(attrdefOidCell);
AttrNumber attrdefAttnum = lfirst_int(attrdefAttnumCell);
List *sequencesFromAttrDef = GetSequencesFromAttrDef(attrdefOid);
/* to simplify and eliminate cases like "DEFAULT nextval('..') - nextval('..')" */
if (list_length(sequencesFromAttrDef) > 1)
{
ereport(ERROR, (errmsg("More than one sequence in a column default"
" is not supported for distribution")));
}
if (list_length(sequencesFromAttrDef) == 1)
{
*dependentSequenceList = list_concat(*dependentSequenceList,
sequencesFromAttrDef);
*attnumList = lappend_int(*attnumList, attrdefAttnum);
}
}
}
/*
* GetSequencesFromAttrDef returns a list of sequence OIDs that have
* dependency with the given attrdefOid in pg_depend
*/
List *
GetSequencesFromAttrDef(Oid attrdefOid)
{
List *sequencesResult = NIL;
ScanKeyData key[2];
HeapTuple tup;
Relation depRel = table_open(DependRelationId, AccessShareLock);
ScanKeyInit(&key[0],
Anum_pg_depend_classid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(AttrDefaultRelationId));
ScanKeyInit(&key[1],
Anum_pg_depend_objid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(attrdefOid));
SysScanDesc scan = systable_beginscan(depRel, DependDependerIndexId, true,
NULL, 2, key);
while (HeapTupleIsValid(tup = systable_getnext(scan)))
{
Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup);
if (deprec->refclassid == RelationRelationId &&
deprec->deptype == DEPENDENCY_NORMAL &&
get_rel_relkind(deprec->refobjid) == RELKIND_SEQUENCE)
{
sequencesResult = lappend_oid(sequencesResult, deprec->refobjid);
}
}
systable_endscan(scan);
table_close(depRel, AccessShareLock);
return sequencesResult;
}
/*
* SequenceDependencyCommandList generates commands to record the dependency
* of sequences on tables on the worker. This dependency does not exist by

View File

@ -67,7 +67,6 @@ static void CitusDeleteFile(const char *filename);
static bool check_log_statement(List *stmt_list);
static void AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequenceName,
Oid sequenceTypeId);
static void SetDefElemArg(AlterSeqStmt *statement, const char *name, Node *arg);
/* exports for SQL callable functions */
@ -738,12 +737,10 @@ AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequenceName,
if (sequenceTypeId == INT4OID)
{
valueBitLength = 28;
sequenceMaxValue = INT_MAX;
}
else if (sequenceTypeId == INT2OID)
{
valueBitLength = 12;
sequenceMaxValue = SHRT_MAX;
}
/* calculate min/max values that the sequence can generate in this worker */
@ -793,7 +790,7 @@ AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequenceName,
* If a DefElem with the given defname does not exist it is created and
* added to the AlterSeqStmt.
*/
static void
void
SetDefElemArg(AlterSeqStmt *statement, const char *name, Node *arg)
{
DefElem *defElem = NULL;

View File

@ -42,6 +42,7 @@ extern char * pg_get_indexclusterdef_string(Oid indexRelationId);
extern bool contain_nextval_expression_walker(Node *node, void *context);
extern char * pg_get_replica_identity_command(Oid tableRelationId);
extern const char * RoleSpecString(RoleSpec *spec, bool withQuoteIdentifier);
extern void EnsureSequenceTypeSupported(Oid relationId, AttrNumber attnum, Oid seqTypId);
/* Function declarations for version dependent PostgreSQL ruleutils functions */
extern void pg_get_query_def(Query *query, StringInfo buffer);

View File

@ -342,6 +342,18 @@ extern List * PreprocessAlterSchemaRenameStmt(Node *node, const char *queryStrin
extern ObjectAddress AlterSchemaRenameStmtObjectAddress(Node *node, bool missing_ok);
/* sequence.c - forward declarations */
extern List * PreprocessAlterSequenceStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessAlterSequenceSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessDropSequenceStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessRenameSequenceStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern ObjectAddress AlterSequenceObjectAddress(Node *stmt, bool missing_ok);
extern ObjectAddress AlterSequenceSchemaStmtObjectAddress(Node *stmt, bool missing_ok);
extern ObjectAddress RenameSequenceStmtObjectAddress(Node *stmt, bool missing_ok);
extern void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt);
extern void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt);

View File

@ -48,6 +48,7 @@ extern void QualifyAlterCollationOwnerStmt(Node *stmt);
/* forward declarations for deparse_table_stmts.c */
extern char * DeparseAlterTableSchemaStmt(Node *stmt);
extern char * DeparseAlterTableStmt(Node *node);
extern void QualifyAlterTableSchemaStmt(Node *stmt);
@ -130,4 +131,9 @@ extern char * DeparseAlterExtensionStmt(Node *stmt);
/* forward declarations for deparse_database_stmts.c */
extern char * DeparseAlterDatabaseOwnerStmt(Node *node);
/* forward declarations for deparse_sequence_stmts.c */
extern char * DeparseDropSequenceStmt(Node *stmt);
extern char * DeparseRenameSequenceStmt(Node *stmt);
extern void QualifyRenameSequenceStmt(Node *stmt);
#endif /* CITUS_DEPARSER_H */

View File

@ -55,6 +55,13 @@ extern void SyncMetadataToNodesMain(Datum main_arg);
extern void SignalMetadataSyncDaemon(Oid database, int sig);
extern bool ShouldInitiateMetadataSync(bool *lockFailure);
extern List * SequenceDDLCommandsForTable(Oid relationId);
extern List * GetSequencesFromAttrDef(Oid attrdefOid);
extern void GetDependentSequencesWithRelation(Oid relationId, List **attnumList,
List **dependentSequenceList, AttrNumber
attnum);
extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum);
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node CASCADE"
#define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \
"SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition"

View File

@ -30,7 +30,7 @@
/* total number of hash tokens (2^32) */
#define HASH_TOKEN_COUNT INT64CONST(4294967296)
#define SELECT_EXIST_QUERY "SELECT EXISTS (SELECT 1 FROM %s)"
#define SELECT_TRUE_QUERY "SELECT TRUE FROM %s LIMIT 1"
#define PG_TABLE_SIZE_FUNCTION "pg_table_size(%s)"
#define PG_RELATION_SIZE_FUNCTION "pg_relation_size(%s)"
#define PG_TOTAL_RELATION_SIZE_FUNCTION "pg_total_relation_size(%s)"
@ -267,6 +267,7 @@ extern void ErrorIfTableIsACatalogTable(Relation relation);
extern void EnsureTableNotDistributed(Oid relationId);
extern void EnsureRelationExists(Oid relationId);
extern bool RegularTable(Oid relationId);
extern bool TableEmpty(Oid tableId);
extern bool RelationUsesIdentityColumns(TupleDesc relationDesc);
extern char * ConstructQualifiedShardName(ShardInterval *shardInterval);
extern uint64 GetFirstShardId(Oid relationId);
@ -288,4 +289,5 @@ extern List * SendShardStatisticsQueriesInParallel(List *citusTableIds, bool
extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection *connection,
uint64 *availableBytes,
uint64 *totalBytes);
extern void ExecuteQueryViaSPI(char *query, int SPIOK);
#endif /* METADATA_UTILITY_H */

View File

@ -123,6 +123,7 @@ extern int32 ArrayObjectCount(ArrayType *arrayObject);
extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedureId);
extern uint64 ExtractShardIdFromTableName(const char *tableName, bool missingOk);
extern void RepartitionCleanupJobDirectories(void);
extern void SetDefElemArg(AlterSeqStmt *statement, const char *name, Node *arg);
/* Function declarations shared with the master planner */

View File

@ -393,7 +393,7 @@ SELECT c.relname, a.amname FROM pg_class c, pg_am a where c.relname SIMILAR TO '
SELECT alter_table_set_access_method('table_type_dist', 'fake_am');
NOTICE: creating a new table for alter_table_set_access_method.table_type_dist
WARNING: fake_scan_getnextslot
CONTEXT: SQL statement "SELECT EXISTS (SELECT 1 FROM alter_table_set_access_method.table_type_dist_1533505599)"
CONTEXT: SQL statement "SELECT TRUE FROM alter_table_set_access_method.table_type_dist_1533505599 LIMIT 1"
WARNING: fake_scan_getnextslot
NOTICE: moving the data of alter_table_set_access_method.table_type_dist
NOTICE: dropping the old alter_table_set_access_method.table_type_dist
@ -406,7 +406,7 @@ NOTICE: renaming the new table to alter_table_set_access_method.table_type_dist
SELECT alter_table_set_access_method('table_type_ref', 'fake_am');
NOTICE: creating a new table for alter_table_set_access_method.table_type_ref
WARNING: fake_scan_getnextslot
CONTEXT: SQL statement "SELECT EXISTS (SELECT 1 FROM alter_table_set_access_method.table_type_ref_1037855087)"
CONTEXT: SQL statement "SELECT TRUE FROM alter_table_set_access_method.table_type_ref_1037855087 LIMIT 1"
WARNING: fake_scan_getnextslot
NOTICE: moving the data of alter_table_set_access_method.table_type_ref
NOTICE: dropping the old alter_table_set_access_method.table_type_ref
@ -695,7 +695,7 @@ SELECT alter_table_set_access_method('abcde_012345678901234567890123456789012345
DEBUG: the name of the shard (abcde_01234567890123456789012345678901234567890_f7ff6612_xxxxxx) for relation (abcde_012345678901234567890123456789012345678901234567890123456) is too long, switching to sequential and local execution mode to prevent self deadlocks
NOTICE: creating a new table for alter_table_set_access_method.abcde_012345678901234567890123456789012345678901234567890123456
DEBUG: pathlist hook for columnar table am
CONTEXT: SQL statement "SELECT EXISTS (SELECT 1 FROM alter_table_set_access_method.abcde_0123456789012345678901234567890123456_f7ff6612_4160710162)"
CONTEXT: SQL statement "SELECT TRUE FROM alter_table_set_access_method.abcde_0123456789012345678901234567890123456_f7ff6612_4160710162 LIMIT 1"
NOTICE: moving the data of alter_table_set_access_method.abcde_012345678901234567890123456789012345678901234567890123456
NOTICE: dropping the old alter_table_set_access_method.abcde_012345678901234567890123456789012345678901234567890123456
CONTEXT: SQL statement "DROP TABLE alter_table_set_access_method.abcde_012345678901234567890123456789012345678901234567890123456 CASCADE"

View File

@ -1254,17 +1254,43 @@ NOTICE: copying the data has completed
(1 row)
explain insert into table_with_sequences select y, x from table_with_sequences;
QUERY PLAN
explain (costs off) insert into table_with_sequences select y, x from table_with_sequences;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT) (cost=0.00..0.00 rows=0 width=0)
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> Custom Scan (Citus Adaptive) (cost=0.00..250.00 rows=100000 width=16)
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on table_with_sequences_4213648 table_with_sequences (cost=0.00..28.50 rows=1850 width=8)
-> Seq Scan on table_with_sequences_4213648 table_with_sequences
(8 rows)
-- verify that we don't report repartitioned insert/select for tables
-- with user-defined sequences.
CREATE SEQUENCE user_defined_sequence;
create table table_with_user_sequences (x int, y int, z bigint default nextval('user_defined_sequence'));
insert into table_with_user_sequences values (1,1);
select create_distributed_table('table_with_user_sequences','x');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
create_distributed_table
---------------------------------------------------------------------
(1 row)
explain (costs off) insert into table_with_user_sequences select y, x from table_with_user_sequences;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus INSERT ... SELECT)
INSERT/SELECT method: pull to coordinator
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on table_with_user_sequences_4213652 table_with_user_sequences
(8 rows)
-- clean-up

View File

@ -51,9 +51,17 @@ SELECT create_distributed_table('collections_list', 'key');
CREATE TABLE collections_list_0
PARTITION OF collections_list (key, collection_id)
FOR VALUES IN ( 0 );
DEBUG: relation "collections_list_key_seq" already exists, skipping
DETAIL: from localhost:xxxxx
DEBUG: relation "collections_list_key_seq" already exists, skipping
DETAIL: from localhost:xxxxx
CREATE TABLE collections_list_1
PARTITION OF collections_list (key, collection_id)
FOR VALUES IN ( 1 );
DEBUG: relation "collections_list_key_seq" already exists, skipping
DETAIL: from localhost:xxxxx
DEBUG: relation "collections_list_key_seq" already exists, skipping
DETAIL: from localhost:xxxxx
-- connection worker and get ready for the tests
\c - - - :worker_1_port
SET search_path TO local_shard_copy;

View File

@ -2212,7 +2212,7 @@ FROM
table_with_defaults
GROUP BY
store_id, first_name, last_name;
-- Volatile function in default should be disallowed
-- Volatile function in default should be disallowed - SERIAL pseudo-types
CREATE TABLE table_with_serial (
store_id int,
s bigserial
@ -2224,6 +2224,25 @@ SELECT create_distributed_table('table_with_serial', 'store_id');
(1 row)
INSERT INTO table_with_serial (store_id)
SELECT
store_id
FROM
table_with_defaults
GROUP BY
store_id;
-- Volatile function in default should be disallowed - user-defined sequence
CREATE SEQUENCE user_defined_sequence;
CREATE TABLE table_with_user_sequence (
store_id int,
s bigint default nextval('user_defined_sequence')
);
SELECT create_distributed_table('table_with_user_sequence', 'store_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table_with_user_sequence (store_id)
SELECT
store_id
FROM
@ -2719,6 +2738,92 @@ SELECT * FROM dist_table_with_sequence ORDER BY user_id, value_1;
(6 rows)
DROP TABLE dist_table_with_sequence;
-- Select into distributed table with a user-defined sequence
CREATE SEQUENCE seq1;
CREATE SEQUENCE seq2;
CREATE TABLE dist_table_with_user_sequence (user_id int default nextval('seq1'), value_1 bigint default nextval('seq2'));
SELECT create_distributed_table('dist_table_with_user_sequence', 'user_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- from local query
INSERT INTO dist_table_with_user_sequence (value_1)
SELECT s FROM generate_series(1,5) s;
SELECT * FROM dist_table_with_user_sequence ORDER BY user_id, value_1;
user_id | value_1
---------------------------------------------------------------------
1 | 1
2 | 2
3 | 3
4 | 4
5 | 5
(5 rows)
-- from a distributed query
INSERT INTO dist_table_with_user_sequence (value_1)
SELECT value_1 FROM dist_table_with_user_sequence ORDER BY value_1;
SELECT * FROM dist_table_with_user_sequence ORDER BY user_id, value_1;
user_id | value_1
---------------------------------------------------------------------
1 | 1
2 | 2
3 | 3
4 | 4
5 | 5
6 | 1
7 | 2
8 | 3
9 | 4
10 | 5
(10 rows)
TRUNCATE dist_table_with_user_sequence;
INSERT INTO dist_table_with_user_sequence (user_id)
SELECT user_id FROM raw_events_second ORDER BY user_id;
SELECT * FROM dist_table_with_user_sequence ORDER BY user_id, value_1;
user_id | value_1
---------------------------------------------------------------------
1 | 1
2 | 2
3 | 3
4 | 4
5 | 5
(5 rows)
WITH top10 AS (
SELECT user_id FROM raw_events_second WHERE value_1 IS NOT NULL ORDER BY value_1 LIMIT 10
)
INSERT INTO dist_table_with_user_sequence (value_1)
SELECT * FROM top10;
ERROR: cannot handle complex subqueries when the router executor is disabled
SELECT * FROM dist_table_with_user_sequence ORDER BY user_id, value_1;
user_id | value_1
---------------------------------------------------------------------
1 | 1
2 | 2
3 | 3
4 | 4
5 | 5
(5 rows)
-- router queries become logical planner queries when there is a nextval call
INSERT INTO dist_table_with_user_sequence (user_id)
SELECT user_id FROM dist_table_with_user_sequence WHERE user_id = 1;
SELECT * FROM dist_table_with_user_sequence ORDER BY user_id, value_1;
user_id | value_1
---------------------------------------------------------------------
1 | 1
1 | 6
2 | 2
3 | 3
4 | 4
5 | 5
(6 rows)
DROP TABLE dist_table_with_user_sequence;
DROP SEQUENCE seq1, seq2;
-- Select from distributed table into reference table
CREATE TABLE ref_table (user_id serial, value_1 int);
SELECT create_reference_table('ref_table');
@ -2783,6 +2888,72 @@ SELECT * FROM ref_table ORDER BY user_id, value_1;
(20 rows)
DROP TABLE ref_table;
-- Select from distributed table into reference table with user-defined sequence
CREATE SEQUENCE seq1;
CREATE TABLE ref_table_with_user_sequence (user_id int default nextval('seq1'), value_1 int);
SELECT create_reference_table('ref_table_with_user_sequence');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO ref_table_with_user_sequence
SELECT user_id, value_1 FROM raw_events_second;
SELECT * FROM ref_table_with_user_sequence ORDER BY user_id, value_1;
user_id | value_1
---------------------------------------------------------------------
1 |
2 |
3 |
4 |
5 |
(5 rows)
INSERT INTO ref_table_with_user_sequence (value_1)
SELECT value_1 FROM raw_events_second ORDER BY value_1;
SELECT * FROM ref_table_with_user_sequence ORDER BY user_id, value_1;
user_id | value_1
---------------------------------------------------------------------
1 |
1 |
2 |
2 |
3 |
3 |
4 |
4 |
5 |
5 |
(10 rows)
INSERT INTO ref_table_with_user_sequence SELECT * FROM ref_table_with_user_sequence;
SELECT * FROM ref_table_with_user_sequence ORDER BY user_id, value_1;
user_id | value_1
---------------------------------------------------------------------
1 |
1 |
1 |
1 |
2 |
2 |
2 |
2 |
3 |
3 |
3 |
3 |
4 |
4 |
4 |
4 |
5 |
5 |
5 |
5 |
(20 rows)
DROP TABLE ref_table_with_user_sequence;
DROP SEQUENCE seq1;
-- Select from reference table into reference table
CREATE TABLE ref1 (d timestamptz);
SELECT create_reference_table('ref1');
@ -3000,7 +3171,7 @@ FROM (
SELECT user_id, value_1
FROM coerce_events
) AS ftop;
ERROR: new row for relation "coerce_agg_13300060" violates check constraint "small_number_13300060"
ERROR: new row for relation "coerce_agg_13300067" violates check constraint "small_number_13300067"
\set VERBOSITY DEFAULT
SELECT * FROM coerce_agg ORDER BY 1 DESC, 2 DESC;
user_id | value_1_agg
@ -3076,6 +3247,8 @@ DROP TABLE reference_table;
DROP TABLE agg_events;
DROP TABLE table_with_defaults;
DROP TABLE table_with_serial;
DROP TABLE table_with_user_sequence;
DROP SEQUENCE user_defined_sequence;
DROP TABLE text_table;
DROP TABLE char_table;
DROP TABLE table_with_starts_with_defaults;

View File

@ -50,8 +50,9 @@ CREATE OR REPLACE FUNCTION pg_catalog.master_create_worker_shards(table_name tex
RETURNS void
AS 'citus', $$master_create_worker_shards$$
LANGUAGE C STRICT;
-- Create a test table with constraints and SERIAL
CREATE TABLE mx_test_table (col_1 int UNIQUE, col_2 text NOT NULL, col_3 BIGSERIAL);
-- Create a test table with constraints and SERIAL and default from user defined sequence
CREATE SEQUENCE user_defined_seq;
CREATE TABLE mx_test_table (col_1 int UNIQUE, col_2 text NOT NULL, col_3 BIGSERIAL, col_4 BIGINT DEFAULT nextval('user_defined_seq'));
SELECT master_create_distributed_table('mx_test_table', 'col_1', 'hash');
master_create_distributed_table
---------------------------------------------------------------------
@ -72,20 +73,22 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
unnest
---------------------------------------------------------------------
ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres
ALTER SEQUENCE public.user_defined_seq OWNER TO postgres
ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
ALTER TABLE public.mx_test_table OWNER TO postgres
ALTER TABLE public.mx_test_table OWNER TO postgres
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL)
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's')
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
SELECT pg_catalog.worker_record_sequence_dependency('public.mx_test_table_col_3_seq'::regclass,'public.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE','bigint')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE','bigint')
SELECT worker_create_truncate_trigger('public.mx_test_table')
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
TRUNCATE pg_dist_node CASCADE
(14 rows)
(16 rows)
-- Show that CREATE INDEX commands are included in the metadata snapshot
CREATE INDEX mx_index ON mx_test_table(col_2);
@ -93,21 +96,23 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
unnest
---------------------------------------------------------------------
ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres
ALTER SEQUENCE public.user_defined_seq OWNER TO postgres
ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
ALTER TABLE public.mx_test_table OWNER TO postgres
ALTER TABLE public.mx_test_table OWNER TO postgres
CREATE INDEX mx_index ON public.mx_test_table USING btree (col_2)
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL)
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's')
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
SELECT pg_catalog.worker_record_sequence_dependency('public.mx_test_table_col_3_seq'::regclass,'public.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE','bigint')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE','bigint')
SELECT worker_create_truncate_trigger('public.mx_test_table')
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
TRUNCATE pg_dist_node CASCADE
(15 rows)
(17 rows)
-- Show that schema changes are included in the metadata snapshot
CREATE SCHEMA mx_testing_schema;
@ -116,21 +121,23 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
unnest
---------------------------------------------------------------------
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
ALTER SEQUENCE public.user_defined_seq OWNER TO postgres
ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2)
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE','bigint')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE','bigint')
SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table')
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
TRUNCATE pg_dist_node CASCADE
(15 rows)
(17 rows)
-- Show that append distributed tables are not included in the metadata snapshot
CREATE TABLE non_mx_test_table (col_1 int, col_2 text);
@ -145,21 +152,23 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
unnest
---------------------------------------------------------------------
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
ALTER SEQUENCE public.user_defined_seq OWNER TO postgres
ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2)
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE','bigint')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE','bigint')
SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table')
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
TRUNCATE pg_dist_node CASCADE
(15 rows)
(17 rows)
-- Show that range distributed tables are not included in the metadata snapshot
UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass;
@ -167,21 +176,23 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
unnest
---------------------------------------------------------------------
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
ALTER SEQUENCE public.user_defined_seq OWNER TO postgres
ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2)
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE','bigint')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE','bigint')
SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table')
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
TRUNCATE pg_dist_node CASCADE
(15 rows)
(17 rows)
-- Test start_metadata_sync_to_node UDF
-- Ensure that hasmetadata=false for all nodes
@ -261,7 +272,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeid;
(4 rows)
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;
logicalrelid | partmethod | partkey | colocationid | repmodel
logicalrelid | partmethod | partkey | colocationid | repmodel
---------------------------------------------------------------------
mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 0 | s
(1 row)
@ -298,7 +309,8 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_testing_sch
col_1 | integer |
col_2 | text | not null
col_3 | bigint | not null default nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass)
(3 rows)
col_4 | bigint | default nextval('user_defined_seq'::regclass)
(4 rows)
SELECT "Column", "Type", "Definition" FROM index_attrs WHERE
relid = 'mx_testing_schema.mx_test_table_col_1_key'::regclass;
@ -397,7 +409,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeid;
(4 rows)
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;
logicalrelid | partmethod | partkey | colocationid | repmodel
logicalrelid | partmethod | partkey | colocationid | repmodel
---------------------------------------------------------------------
mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 0 | s
(1 row)
@ -434,7 +446,8 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_testing_sch
col_1 | integer |
col_2 | text | not null
col_3 | bigint | not null default nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass)
(3 rows)
col_4 | bigint | default nextval('user_defined_seq'::regclass)
(4 rows)
SELECT "Column", "Type", "Definition" FROM index_attrs WHERE
relid = 'mx_testing_schema.mx_test_table_col_1_key'::regclass;
@ -1596,6 +1609,145 @@ SELECT pg_reload_conf();
(1 row)
UPDATE pg_dist_node SET metadatasynced=true WHERE nodeport=:worker_1_port;
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
---------------------------------------------------------------------
7
(1 row)
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
CREATE SEQUENCE mx_test_sequence_0;
CREATE SEQUENCE mx_test_sequence_1;
-- test create_distributed_table
CREATE TABLE test_table (id int DEFAULT nextval('mx_test_sequence_0'));
SELECT create_distributed_table('test_table', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- shouldn't work since it's partition column
ALTER TABLE test_table ALTER COLUMN id SET DEFAULT nextval('mx_test_sequence_1');
ERROR: cannot execute ALTER TABLE command involving partition column
-- test different plausible commands
ALTER TABLE test_table ADD COLUMN id2 int DEFAULT nextval('mx_test_sequence_1');
ALTER TABLE test_table ALTER COLUMN id2 DROP DEFAULT;
ALTER TABLE test_table ALTER COLUMN id2 SET DEFAULT nextval('mx_test_sequence_1');
SELECT unnest(master_metadata_snapshot()) order by 1;
unnest
---------------------------------------------------------------------
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
ALTER SEQUENCE public.mx_test_sequence_0 OWNER TO postgres
ALTER SEQUENCE public.mx_test_sequence_1 OWNER TO postgres
ALTER SEQUENCE public.user_defined_seq OWNER TO postgres
ALTER TABLE mx_test_schema_1.mx_table_1 ADD CONSTRAINT mx_fk_constraint_2 FOREIGN KEY (col1) REFERENCES mx_test_schema_2.mx_table_2(col1) NOT VALID
ALTER TABLE mx_test_schema_1.mx_table_1 ADD CONSTRAINT mx_table_1_col1_key UNIQUE (col1)
ALTER TABLE mx_test_schema_1.mx_table_1 OWNER TO postgres
ALTER TABLE mx_test_schema_1.mx_table_1 OWNER TO postgres
ALTER TABLE mx_test_schema_2.mx_table_2 ADD CONSTRAINT mx_fk_constraint FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1)
ALTER TABLE mx_test_schema_2.mx_table_2 ADD CONSTRAINT mx_table_2_col1_key UNIQUE (col1)
ALTER TABLE mx_test_schema_2.mx_table_2 OWNER TO postgres
ALTER TABLE mx_test_schema_2.mx_table_2 OWNER TO postgres
ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
ALTER TABLE public.dist_table_1 OWNER TO postgres
ALTER TABLE public.dist_table_1 OWNER TO postgres
ALTER TABLE public.mx_ref OWNER TO postgres
ALTER TABLE public.mx_ref OWNER TO postgres
ALTER TABLE public.test_table OWNER TO postgres
ALTER TABLE public.test_table OWNER TO postgres
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2)
CREATE INDEX mx_index_1 ON mx_test_schema_1.mx_table_1 USING btree (col1)
CREATE INDEX mx_index_2 ON mx_test_schema_2.mx_table_2 USING btree (col2)
CREATE TABLE mx_test_schema_1.mx_table_1 (col1 integer, col2 text, col3 integer)
CREATE TABLE mx_test_schema_2.mx_table_2 (col1 integer, col2 text)
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
CREATE TABLE public.dist_table_1 (a integer)
CREATE TABLE public.mx_ref (col_1 integer, col_2 text)
CREATE TABLE public.test_table (id integer DEFAULT nextval('public.mx_test_sequence_0'::regclass), id2 integer DEFAULT nextval('public.mx_test_sequence_1'::regclass))
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (4, 1, 'localhost', 8888, 'default', FALSE, FALSE, TRUE, 'secondary'::noderole, 'default'),(5, 1, 'localhost', 8889, 'default', FALSE, FALSE, TRUE, 'secondary'::noderole, 'second-cluster'),(1, 1, 'localhost', 57637, 'default', TRUE, TRUE, TRUE, 'primary'::noderole, 'default'),(7, 5, 'localhost', 57638, 'default', TRUE, TRUE, TRUE, 'primary'::noderole, 'default')
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_test_schema_1.mx_table_1'::regclass, 'h', column_name_to_column('mx_test_schema_1.mx_table_1','col1'), 3, 's')
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_test_schema_2.mx_table_2'::regclass, 'h', column_name_to_column('mx_test_schema_2.mx_table_2','col1'), 3, 's')
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.dist_table_1'::regclass, 'h', column_name_to_column('public.dist_table_1','a'), 10004, 's')
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_ref'::regclass, 'n', NULL, 10002, 't')
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.test_table'::regclass, 'h', column_name_to_column('public.test_table','id'), 10004, 's')
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 5, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 5, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 5, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 5, 100007)
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310020, 1, 0, 1, 100020),(1310021, 1, 0, 5, 100021),(1310022, 1, 0, 1, 100022),(1310023, 1, 0, 5, 100023),(1310024, 1, 0, 1, 100024)
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310025, 1, 0, 1, 100025),(1310026, 1, 0, 5, 100026),(1310027, 1, 0, 1, 100027),(1310028, 1, 0, 5, 100028),(1310029, 1, 0, 1, 100029)
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310073, 1, 0, 1, 100074),(1310073, 1, 0, 5, 100075)
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310074, 1, 0, 1, 100076),(1310075, 1, 0, 5, 100077),(1310076, 1, 0, 1, 100078),(1310077, 1, 0, 5, 100079)
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310083, 1, 0, 1, 100086),(1310084, 1, 0, 5, 100087),(1310085, 1, 0, 1, 100088),(1310086, 1, 0, 5, 100089)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_test_schema_1.mx_table_1'::regclass, 1310020, 't', '-2147483648', '-1288490190'),('mx_test_schema_1.mx_table_1'::regclass, 1310021, 't', '-1288490189', '-429496731'),('mx_test_schema_1.mx_table_1'::regclass, 1310022, 't', '-429496730', '429496728'),('mx_test_schema_1.mx_table_1'::regclass, 1310023, 't', '429496729', '1288490187'),('mx_test_schema_1.mx_table_1'::regclass, 1310024, 't', '1288490188', '2147483647')
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_test_schema_2.mx_table_2'::regclass, 1310025, 't', '-2147483648', '-1288490190'),('mx_test_schema_2.mx_table_2'::regclass, 1310026, 't', '-1288490189', '-429496731'),('mx_test_schema_2.mx_table_2'::regclass, 1310027, 't', '-429496730', '429496728'),('mx_test_schema_2.mx_table_2'::regclass, 1310028, 't', '429496729', '1288490187'),('mx_test_schema_2.mx_table_2'::regclass, 1310029, 't', '1288490188', '2147483647')
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.dist_table_1'::regclass, 1310074, 't', '-2147483648', '-1073741825'),('public.dist_table_1'::regclass, 1310075, 't', '-1073741824', '-1'),('public.dist_table_1'::regclass, 1310076, 't', '0', '1073741823'),('public.dist_table_1'::regclass, 1310077, 't', '1073741824', '2147483647')
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_ref'::regclass, 1310073, 't', NULL, NULL)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.test_table'::regclass, 1310083, 't', '-2147483648', '-1073741825'),('public.test_table'::regclass, 1310084, 't', '-1073741824', '-1'),('public.test_table'::regclass, 1310085, 't', '0', '1073741823'),('public.test_table'::regclass, 1310086, 't', '1073741824', '2147483647')
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE','bigint')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_sequence_0 INCREMENT BY 1 MINVALUE 1 MAXVALUE 2147483647 START WITH 1 NO CYCLE','integer')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_sequence_1 INCREMENT BY 1 MINVALUE 1 MAXVALUE 2147483647 START WITH 1 NO CYCLE','integer')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE','bigint')
SELECT worker_create_truncate_trigger('mx_test_schema_1.mx_table_1')
SELECT worker_create_truncate_trigger('mx_test_schema_2.mx_table_2')
SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table')
SELECT worker_create_truncate_trigger('public.dist_table_1')
SELECT worker_create_truncate_trigger('public.mx_ref')
SELECT worker_create_truncate_trigger('public.test_table')
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
TRUNCATE pg_dist_node CASCADE
(62 rows)
-- shouldn't work since test_table is MX
ALTER TABLE test_table ADD COLUMN id3 bigserial;
ERROR: cannot execute ADD COLUMN commands involving serial pseudotypes when metadata is synchronized to workers
-- shouldn't work since the above operations should be the only subcommands
ALTER TABLE test_table ADD COLUMN id4 int DEFAULT nextval('mx_test_sequence_1') CHECK (id4 > 0);
ERROR: cannot execute ADD COLUMN .. DEFAULT nextval('..') command with other subcommands/constraints
HINT: You can issue each subcommand separately
ALTER TABLE test_table ADD COLUMN id4 int, ADD COLUMN id5 int DEFAULT nextval('mx_test_sequence_1');
ERROR: cannot execute ADD COLUMN .. DEFAULT nextval('..') command with other subcommands/constraints
HINT: You can issue each subcommand separately
ALTER TABLE test_table ALTER COLUMN id1 SET DEFAULT nextval('mx_test_sequence_1'), ALTER COLUMN id2 DROP DEFAULT;
ERROR: cannot execute ALTER COLUMN COLUMN .. SET DEFAULT nextval('..') command with other subcommands
HINT: You can issue each subcommand separately
ALTER TABLE test_table ADD COLUMN id4 bigserial CHECK (id4 > 0);
ERROR: cannot execute ADD COLUMN commands involving serial pseudotypes when metadata is synchronized to workers
\c - - - :worker_1_port
\ds
List of relations
Schema | Name | Type | Owner
---------------------------------------------------------------------
public | mx_test_sequence_0 | sequence | postgres
public | mx_test_sequence_1 | sequence | postgres
public | user_defined_seq | sequence | postgres
(3 rows)
\c - - - :master_port
CREATE SEQUENCE local_sequence;
-- verify that DROP SEQUENCE will propagate the command to workers for
-- the distributed sequences mx_test_sequence_0 and mx_test_sequence_1
DROP SEQUENCE mx_test_sequence_0, mx_test_sequence_1, local_sequence CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to default value for column id2 of table test_table
drop cascades to default value for column id of table test_table
\c - - - :worker_1_port
\ds
List of relations
Schema | Name | Type | Owner
---------------------------------------------------------------------
public | user_defined_seq | sequence | postgres
(1 row)
\c - - - :master_port
DROP TABLE test_table CASCADE;
-- Cleanup
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node

View File

@ -241,7 +241,9 @@ SELECT :worker_1_lastval = :worker_2_lastval;
-- the type of sequences can't be changed
ALTER TABLE mx_sequence ALTER value TYPE BIGINT;
ERROR: cannot execute ALTER COLUMN TYPE .. command because the column involves a default coming from a sequence
ALTER TABLE mx_sequence ALTER value TYPE INT;
ERROR: cannot execute ALTER COLUMN TYPE .. command because the column involves a default coming from a sequence
-- test distributed tables owned by extension
CREATE TABLE seg_test (x int);
INSERT INTO seg_test VALUES (42);

View File

@ -64,6 +64,14 @@ SELECT create_distributed_table('dist_table_1', 'a');
(1 row)
CREATE SEQUENCE sequence;
CREATE TABLE reference_table (a int default nextval('sequence'));
SELECT create_reference_table('reference_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- update the node
SELECT 1 FROM master_update_node((SELECT nodeid FROM pg_dist_node),
'localhost', :worker_2_port);
@ -804,7 +812,10 @@ SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
(0 rows)
-- cleanup
DROP SEQUENCE sequence CASCADE;
NOTICE: drop cascades to default value for column a of table reference_table
DROP TABLE ref_table;
DROP TABLE reference_table;
TRUNCATE pg_dist_colocation;
SELECT count(*) FROM (SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node) t;
count

View File

@ -24,7 +24,7 @@ INSERT INTO dest_table (a, b) VALUES (2, 1);
INSERT INTO source_table (a, b) VALUES (1, 5);
INSERT INTO source_table (a, b) VALUES (10, 10);
-- simulate actually having secondary nodes
SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node;
SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node ORDER BY 1, 2;
nodeid | groupid | nodename | nodeport | noderack | isactive | noderole | nodecluster
---------------------------------------------------------------------
1 | 1 | localhost | 57637 | default | t | primary | default

View File

@ -18,7 +18,7 @@ SELECT * FROM pg_dist_shard WHERE logicalrelid='source_table_xyz'::regclass::oid
source_table_xyz | 4213582 | t | (25,z) | (49,z)
(2 rows)
SELECT shardid, nodename, nodeport FROM pg_dist_shard_placement WHERE EXISTS(SELECT shardid FROM pg_dist_shard WHERE shardid=pg_dist_shard_placement.shardid AND logicalrelid='source_table_xyz'::regclass::oid);
SELECT shardid, nodename, nodeport FROM pg_dist_shard_placement WHERE EXISTS(SELECT shardid FROM pg_dist_shard WHERE shardid=pg_dist_shard_placement.shardid AND logicalrelid='source_table_xyz'::regclass::oid) ORDER BY 1, 2, 3;
shardid | nodename | nodeport
---------------------------------------------------------------------
4213581 | localhost | 57637

View File

@ -0,0 +1,389 @@
--
-- MULTI_SEQUENCE_DEFAULT
--
-- Tests related to column defaults coming from a sequence
--
SET citus.next_shard_id TO 890000;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
-- Cannot add a column involving DEFAULT nextval('..') because the table is not empty
CREATE SEQUENCE seq_0;
CREATE TABLE seq_test_0 (x int, y int);
SELECT create_distributed_table('seq_test_0','x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO seq_test_0 SELECT 1, s FROM generate_series(1, 50) s;
ALTER TABLE seq_test_0 ADD COLUMN z int DEFAULT nextval('seq_0');
ERROR: cannot add a column involving DEFAULT nextval('..') because the table is not empty
HINT: You can first call ALTER TABLE .. ADD COLUMN .. smallint/int/bigint
Then set the default by ALTER TABLE .. ALTER COLUMN .. SET DEFAULT nextval('..')
ALTER TABLE seq_test_0 ADD COLUMN z serial;
ERROR: Cannot add a column involving serial pseudotypes because the table is not empty
HINT: You can first call ALTER TABLE .. ADD COLUMN .. smallint/int/bigint
Then set the default by ALTER TABLE .. ALTER COLUMN .. SET DEFAULT nextval('..')
-- follow hint
ALTER TABLE seq_test_0 ADD COLUMN z int;
ALTER TABLE seq_test_0 ALTER COLUMN z SET DEFAULT nextval('seq_0');
SELECT * FROM seq_test_0 ORDER BY 1, 2 LIMIT 5;
x | y | z
---------------------------------------------------------------------
1 | 1 |
1 | 2 |
1 | 3 |
1 | 4 |
1 | 5 |
(5 rows)
\d seq_test_0
Table "public.seq_test_0"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
x | integer | | |
y | integer | | |
z | integer | | | nextval('seq_0'::regclass)
-- check that we can add serial pseudo-type columns
-- when metadata is not yet synced to workers
TRUNCATE seq_test_0;
ALTER TABLE seq_test_0 ADD COLUMN w00 smallserial;
ALTER TABLE seq_test_0 ADD COLUMN w01 serial2;
ALTER TABLE seq_test_0 ADD COLUMN w10 serial;
ALTER TABLE seq_test_0 ADD COLUMN w11 serial4;
ALTER TABLE seq_test_0 ADD COLUMN w20 bigserial;
ALTER TABLE seq_test_0 ADD COLUMN w21 serial8;
-- check alter column type precaution
ALTER TABLE seq_test_0 ALTER COLUMN z TYPE bigint;
ERROR: cannot execute ALTER COLUMN TYPE .. command because the column involves a default coming from a sequence
ALTER TABLE seq_test_0 ALTER COLUMN z TYPE smallint;
ERROR: cannot execute ALTER COLUMN TYPE .. command because the column involves a default coming from a sequence
-- MX tests
-- check that there's not problem with group ID cache
CREATE TABLE seq_test_4 (x int, y int);
SELECT create_distributed_table('seq_test_4','x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE SEQUENCE seq_4;
ALTER TABLE seq_test_4 ADD COLUMN a int DEFAULT nextval('seq_4');
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
DROP SEQUENCE seq_4 CASCADE;
NOTICE: drop cascades to default value for column a of table seq_test_4
TRUNCATE seq_test_4;
CREATE SEQUENCE seq_4;
ALTER TABLE seq_test_4 ADD COLUMN b int DEFAULT nextval('seq_4');
-- on worker it should generate high sequence number
\c - - - :worker_1_port
INSERT INTO seq_test_4 VALUES (1,2) RETURNING *;
x | y | a | b
---------------------------------------------------------------------
1 | 2 | | 268435457
(1 row)
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- check sequence type consistency in all nodes
CREATE SEQUENCE seq_1;
-- type is bigint by default
\d seq_1
Sequence "public.seq_1"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
bigint | 1 | 1 | 9223372036854775807 | 1 | no | 1
CREATE TABLE seq_test_1 (x int, y int);
SELECT create_distributed_table('seq_test_1','x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER TABLE seq_test_1 ADD COLUMN z int DEFAULT nextval('seq_1');
-- type is changed to int
\d seq_1
Sequence "public.seq_1"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
integer | 1 | 1 | 2147483647 | 1 | no | 1
-- check insertion is within int bounds in the worker
\c - - - :worker_1_port
INSERT INTO seq_test_1 values (1, 2) RETURNING *;
x | y | z
---------------------------------------------------------------------
1 | 2 | 268435457
(1 row)
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- check that we cannot add serial pseudo-type columns
-- when metadata is synced to workers
ALTER TABLE seq_test_1 ADD COLUMN w bigserial;
ERROR: cannot execute ADD COLUMN commands involving serial pseudotypes when metadata is synchronized to workers
-- check for sequence type clashes
CREATE SEQUENCE seq_2;
CREATE TABLE seq_test_2 (x int, y bigint DEFAULT nextval('seq_2'));
-- should work
SELECT create_distributed_table('seq_test_2','x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
DROP TABLE seq_test_2;
CREATE TABLE seq_test_2 (x int, y int DEFAULT nextval('seq_2'));
-- should work
SELECT create_distributed_table('seq_test_2','x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE seq_test_2_0(x int, y smallint DEFAULT nextval('seq_2'));
-- shouldn't work
SELECT create_distributed_table('seq_test_2_0','x');
ERROR: The sequence public.seq_2 is already used for a different type in column 2 of the table public.seq_test_2
DROP TABLE seq_test_2;
DROP TABLE seq_test_2_0;
-- should work
CREATE TABLE seq_test_2 (x int, y bigint DEFAULT nextval('seq_2'));
SELECT create_distributed_table('seq_test_2','x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
DROP TABLE seq_test_2;
CREATE TABLE seq_test_2 (x int, y int DEFAULT nextval('seq_2'), z bigint DEFAULT nextval('seq_2'));
-- shouldn't work
SELECT create_distributed_table('seq_test_2','x');
ERROR: The sequence public.seq_2 is already used for a different type in column 3 of the table public.seq_test_2
-- check rename is propagated properly
ALTER SEQUENCE seq_2 RENAME TO sequence_2;
-- check in the worker
\c - - - :worker_1_port
\d sequence_2
Sequence "public.sequence_2"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
bigint | 281474976710657 | 281474976710657 | 562949953421313 | 1 | no | 1
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- check rename with another schema
-- we notice that schema is also propagated as one of the sequence's dependencies
CREATE SCHEMA sequence_default_0;
SET search_path TO public, sequence_default_0;
CREATE SEQUENCE sequence_default_0.seq_3;
CREATE TABLE seq_test_3 (x int, y bigint DEFAULT nextval('seq_3'));
SELECT create_distributed_table('seq_test_3', 'x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER SEQUENCE sequence_default_0.seq_3 RENAME TO sequence_3;
-- check in the worker
\c - - - :worker_1_port
\d sequence_default_0.sequence_3
Sequence "sequence_default_0.sequence_3"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
bigint | 281474976710657 | 281474976710657 | 562949953421313 | 1 | no | 1
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
DROP SEQUENCE sequence_default_0.sequence_3 CASCADE;
NOTICE: drop cascades to default value for column y of table seq_test_3
DROP SCHEMA sequence_default_0;
-- DROP SCHEMA problem: expected since we don't propagate DROP SCHEMA
CREATE TABLE seq_test_5 (x int, y int);
SELECT create_distributed_table('seq_test_5','x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE SCHEMA sequence_default_1;
CREATE SEQUENCE sequence_default_1.seq_5;
ALTER TABLE seq_test_5 ADD COLUMN a int DEFAULT nextval('sequence_default_1.seq_5');
DROP SCHEMA sequence_default_1 CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to sequence sequence_default_1.seq_5
drop cascades to default value for column a of table seq_test_5
-- sequence is gone from coordinator
INSERT INTO seq_test_5 VALUES (1, 2) RETURNING *;
x | y | a
---------------------------------------------------------------------
1 | 2 |
(1 row)
-- but is still present on worker
\c - - - :worker_1_port
INSERT INTO seq_test_5 VALUES (1, 2) RETURNING *;
x | y | a
---------------------------------------------------------------------
1 | 2 | 268435457
(1 row)
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- apply workaround
SELECT run_command_on_workers('DROP SCHEMA sequence_default_1 CASCADE');
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"DROP SCHEMA")
(localhost,57638,t,"DROP SCHEMA")
(2 rows)
-- now the sequence is gone from the worker as well
\c - - - :worker_1_port
INSERT INTO seq_test_5 VALUES (1, 2) RETURNING *;
x | y | a
---------------------------------------------------------------------
1 | 2 |
(1 row)
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- check some more complex cases
CREATE SEQUENCE seq_6;
CREATE TABLE seq_test_6 (x int, t timestamptz DEFAULT now(), s int DEFAULT nextval('seq_6'), m int) PARTITION BY RANGE (t);
SELECT create_distributed_table('seq_test_6','x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- shouldn't work since x is the partition column
ALTER TABLE seq_test_6 ALTER COLUMN x SET DEFAULT nextval('seq_6');
ERROR: cannot execute ALTER TABLE command involving partition column
-- should work since both s and m have int type
ALTER TABLE seq_test_6 ALTER COLUMN m SET DEFAULT nextval('seq_6');
-- It is possible for a partition to have a different DEFAULT than its parent
CREATE SEQUENCE seq_7;
CREATE TABLE seq_test_7 (x text, s bigint DEFAULT nextval('seq_7'), t timestamptz DEFAULT now()) PARTITION BY RANGE (t);
SELECT create_distributed_table('seq_test_7','x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE SEQUENCE seq_7_par;
CREATE TABLE seq_test_7_par (x text, s bigint DEFAULT nextval('seq_7_par'), t timestamptz DEFAULT now());
ALTER TABLE seq_test_7 ATTACH PARTITION seq_test_7_par FOR VALUES FROM ('2021-05-31') TO ('2021-06-01');
-- check that both sequences are in worker
\c - - - :worker_1_port
\d seq_7
Sequence "public.seq_7"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
bigint | 281474976710657 | 281474976710657 | 562949953421313 | 1 | no | 1
\d seq_7_par
Sequence "public.seq_7_par"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
bigint | 281474976710657 | 281474976710657 | 562949953421313 | 1 | no | 1
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- Check that various ALTER SEQUENCE commands
-- are not allowed for a distributed sequence for now
CREATE SEQUENCE seq_8;
CREATE SCHEMA sequence_default_8;
-- can change schema in a sequence not yet distributed
ALTER SEQUENCE seq_8 SET SCHEMA sequence_default_8;
ALTER SEQUENCE sequence_default_8.seq_8 SET SCHEMA public;
CREATE TABLE seq_test_8 (x int, y int DEFAULT nextval('seq_8'));
SELECT create_distributed_table('seq_test_8', 'x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- cannot change sequence specifications
ALTER SEQUENCE seq_8 AS bigint;
ERROR: This operation is currently not allowed for a distributed sequence.
ALTER SEQUENCE seq_8 INCREMENT BY 2;
ERROR: This operation is currently not allowed for a distributed sequence.
ALTER SEQUENCE seq_8 MINVALUE 5 MAXVALUE 5000;
ERROR: This operation is currently not allowed for a distributed sequence.
ALTER SEQUENCE seq_8 START WITH 6;
ERROR: This operation is currently not allowed for a distributed sequence.
ALTER SEQUENCE seq_8 RESTART WITH 6;
ERROR: This operation is currently not allowed for a distributed sequence.
ALTER SEQUENCE seq_8 NO CYCLE;
ERROR: This operation is currently not allowed for a distributed sequence.
ALTER SEQUENCE seq_8 OWNED BY seq_test_7;
ERROR: This operation is currently not allowed for a distributed sequence.
-- cannot change schema in a distributed sequence
ALTER SEQUENCE seq_8 SET SCHEMA sequence_default_8;
ERROR: This operation is currently not allowed for a distributed sequence.
DROP SCHEMA sequence_default_8;
-- cannot use more than one sequence in a column default
CREATE SEQUENCE seq_9;
CREATE SEQUENCE seq_10;
CREATE TABLE seq_test_9 (x int, y int DEFAULT nextval('seq_9') - nextval('seq_10'));
SELECT create_distributed_table('seq_test_9', 'x');
ERROR: More than one sequence in a column default is not supported for distribution
-- clean up
DROP TABLE seq_test_0, seq_test_1, seq_test_2, seq_test_3, seq_test_4, seq_test_5, seq_test_6, seq_test_7, seq_test_8, seq_test_9;
DROP SEQUENCE seq_0, seq_1, sequence_2, seq_4, seq_6, seq_7, seq_7_par, seq_8, seq_9, seq_10;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)

View File

@ -2,6 +2,8 @@
-- MULTI_TABLE_DDL
--
-- Tests around changing the schema and dropping of a distributed table
-- Test DEFAULTS coming from SERIAL pseudo-types, user-defined sequences
--
SET citus.next_shard_id TO 870000;
CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL);
SELECT create_distributed_table('testtableddl', 'distributecol', 'append');
@ -98,9 +100,8 @@ SELECT create_distributed_table('testserialtable', 'group_id', 'hash');
(1 row)
-- should not be able to add additional serial columns
-- can add additional serial columns
ALTER TABLE testserialtable ADD COLUMN other_id serial;
ERROR: cannot execute ADD COLUMN commands involving serial pseudotypes
-- and we shouldn't be able to change a distributed sequence's owner
ALTER SEQUENCE testserialtable_id_seq OWNED BY NONE;
ERROR: cannot alter OWNED BY option of a sequence already owned by a distributed table
@ -108,13 +109,16 @@ ERROR: cannot alter OWNED BY option of a sequence already owned by a distribute
CREATE SEQUENCE standalone_sequence OWNED BY testserialtable.group_id;
ERROR: cannot create sequences that specify a distributed table in their OWNED BY option
HINT: Use a sequence in a distributed table by specifying a serial column type before creating any shards.
-- EDIT: this doesn't error out for now in order to allow adding
-- new serial columns (they always come with owned_by command)
-- should be fixed later in ALTER SEQUENCE preprocessing
-- or even change a manual sequence to be owned by a distributed table
CREATE SEQUENCE standalone_sequence;
ALTER SEQUENCE standalone_sequence OWNED BY testserialtable.group_id;
ERROR: cannot associate an existing sequence with a distributed table
HINT: Use a sequence in a distributed table by specifying a serial column type before creating any shards.
-- an edge case, but it's OK to change an owner to the same distributed table
-- EDIT: this doesn't work for now for a distributed sequence
ALTER SEQUENCE testserialtable_id_seq OWNED BY testserialtable.id;
ERROR: This operation is currently not allowed for a distributed sequence.
-- drop distributed table
\c - - - :master_port
DROP TABLE testserialtable;
@ -126,3 +130,38 @@ DROP TABLE testserialtable;
---------------------------------------------------------------------
(0 rows)
\c - - - :master_port
-- test DEFAULT coming from SERIAL pseudo-types and user-defined sequences
CREATE SEQUENCE test_sequence_0;
CREATE SEQUENCE test_sequence_1;
CREATE TABLE test_table (id1 int DEFAULT nextval('test_sequence_0'));
SELECT create_distributed_table('test_table', 'id1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- shouldn't work since it's partition column
ALTER TABLE test_table ALTER COLUMN id1 SET DEFAULT nextval('test_sequence_1');
ERROR: cannot execute ALTER TABLE command involving partition column
-- test different plausible commands
ALTER TABLE test_table ADD COLUMN id2 int DEFAULT nextval('test_sequence_1');
ALTER TABLE test_table ALTER COLUMN id2 DROP DEFAULT;
ALTER TABLE test_table ALTER COLUMN id2 SET DEFAULT nextval('test_sequence_1');
ALTER TABLE test_table ADD COLUMN id3 bigserial;
-- shouldn't work since the above operations should be the only subcommands
ALTER TABLE test_table ADD COLUMN id4 int DEFAULT nextval('test_sequence_1') CHECK (id4 > 0);
ERROR: cannot execute ADD COLUMN .. DEFAULT nextval('..') command with other subcommands/constraints
HINT: You can issue each subcommand separately
ALTER TABLE test_table ADD COLUMN id4 int, ADD COLUMN id5 int DEFAULT nextval('test_sequence_1');
ERROR: cannot execute ADD COLUMN .. DEFAULT nextval('..') command with other subcommands/constraints
HINT: You can issue each subcommand separately
ALTER TABLE test_table ALTER COLUMN id3 SET DEFAULT nextval('test_sequence_1'), ALTER COLUMN id2 DROP DEFAULT;
ERROR: cannot execute ALTER COLUMN COLUMN .. SET DEFAULT nextval('..') command with other subcommands
HINT: You can issue each subcommand separately
ALTER TABLE test_table ADD COLUMN id4 bigserial CHECK (id4 > 0);
ERROR: cannot execute ADD COLUMN commands involving serial pseudotypes with other subcommands/constraints
HINT: You can issue each subcommand separately
DROP TABLE test_table CASCADE;
DROP SEQUENCE test_sequence_0;
DROP SEQUENCE test_sequence_1;

View File

@ -31,7 +31,7 @@ insert into test_hash_dist values (1, 1);
WARNING: fake_tuple_insert
select create_distributed_table('test_hash_dist','id');
WARNING: fake_scan_getnextslot
CONTEXT: SQL statement "SELECT EXISTS (SELECT 1 FROM test_tableam.test_hash_dist)"
CONTEXT: SQL statement "SELECT TRUE FROM test_tableam.test_hash_dist LIMIT 1"
WARNING: fake_scan_getnextslot
NOTICE: Copying data from local table...
WARNING: fake_scan_getnextslot
@ -85,7 +85,7 @@ insert into test_ref values (1);
WARNING: fake_tuple_insert
select create_reference_table('test_ref');
WARNING: fake_scan_getnextslot
CONTEXT: SQL statement "SELECT EXISTS (SELECT 1 FROM test_tableam.test_ref)"
CONTEXT: SQL statement "SELECT TRUE FROM test_tableam.test_ref LIMIT 1"
WARNING: fake_scan_getnextslot
NOTICE: Copying data from local table...
WARNING: fake_scan_getnextslot
@ -156,9 +156,9 @@ SELECT master_remove_node('localhost', :master_port);
CREATE TABLE test_range_dist(id int, val int) using fake_am;
SELECT create_distributed_table('test_range_dist', 'id', 'range');
WARNING: fake_scan_getnextslot
CONTEXT: SQL statement "SELECT EXISTS (SELECT 1 FROM test_tableam.test_range_dist)"
CONTEXT: SQL statement "SELECT TRUE FROM test_tableam.test_range_dist LIMIT 1"
WARNING: fake_scan_getnextslot
CONTEXT: SQL statement "SELECT EXISTS (SELECT 1 FROM test_tableam.test_range_dist)"
CONTEXT: SQL statement "SELECT TRUE FROM test_tableam.test_range_dist LIMIT 1"
create_distributed_table
---------------------------------------------------------------------
@ -280,7 +280,7 @@ NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.test_partitioned_p1$$)
WARNING: fake_scan_getnextslot
CONTEXT: SQL statement "SELECT EXISTS (SELECT 1 FROM public.test_partitioned_p2)"
CONTEXT: SQL statement "SELECT TRUE FROM public.test_partitioned_p2 LIMIT 1"
WARNING: fake_scan_getnextslot
NOTICE: Copying data from local table...
WARNING: fake_scan_getnextslot

View File

@ -31,6 +31,7 @@ test: alter_database_owner
test: multi_test_helpers multi_test_helpers_superuser
test: multi_test_catalog_views
test: multi_table_ddl
test: multi_sequence_default
test: multi_name_lengths
test: multi_name_resolution
test: multi_metadata_access

View File

@ -623,7 +623,15 @@ DO UPDATE SET
create table table_with_sequences (x int, y int, z bigserial);
insert into table_with_sequences values (1,1);
select create_distributed_table('table_with_sequences','x');
explain insert into table_with_sequences select y, x from table_with_sequences;
explain (costs off) insert into table_with_sequences select y, x from table_with_sequences;
-- verify that we don't report repartitioned insert/select for tables
-- with user-defined sequences.
CREATE SEQUENCE user_defined_sequence;
create table table_with_user_sequences (x int, y int, z bigint default nextval('user_defined_sequence'));
insert into table_with_user_sequences values (1,1);
select create_distributed_table('table_with_user_sequences','x');
explain (costs off) insert into table_with_user_sequences select y, x from table_with_user_sequences;
-- clean-up
SET client_min_messages TO WARNING;

View File

@ -1729,7 +1729,7 @@ FROM
GROUP BY
store_id, first_name, last_name;
-- Volatile function in default should be disallowed
-- Volatile function in default should be disallowed - SERIAL pseudo-types
CREATE TABLE table_with_serial (
store_id int,
s bigserial
@ -1744,6 +1744,22 @@ FROM
GROUP BY
store_id;
-- Volatile function in default should be disallowed - user-defined sequence
CREATE SEQUENCE user_defined_sequence;
CREATE TABLE table_with_user_sequence (
store_id int,
s bigint default nextval('user_defined_sequence')
);
SELECT create_distributed_table('table_with_user_sequence', 'store_id');
INSERT INTO table_with_user_sequence (store_id)
SELECT
store_id
FROM
table_with_defaults
GROUP BY
store_id;
-- do some more error/error message checks
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
@ -2018,6 +2034,48 @@ SELECT * FROM dist_table_with_sequence ORDER BY user_id, value_1;
DROP TABLE dist_table_with_sequence;
-- Select into distributed table with a user-defined sequence
CREATE SEQUENCE seq1;
CREATE SEQUENCE seq2;
CREATE TABLE dist_table_with_user_sequence (user_id int default nextval('seq1'), value_1 bigint default nextval('seq2'));
SELECT create_distributed_table('dist_table_with_user_sequence', 'user_id');
-- from local query
INSERT INTO dist_table_with_user_sequence (value_1)
SELECT s FROM generate_series(1,5) s;
SELECT * FROM dist_table_with_user_sequence ORDER BY user_id, value_1;
-- from a distributed query
INSERT INTO dist_table_with_user_sequence (value_1)
SELECT value_1 FROM dist_table_with_user_sequence ORDER BY value_1;
SELECT * FROM dist_table_with_user_sequence ORDER BY user_id, value_1;
TRUNCATE dist_table_with_user_sequence;
INSERT INTO dist_table_with_user_sequence (user_id)
SELECT user_id FROM raw_events_second ORDER BY user_id;
SELECT * FROM dist_table_with_user_sequence ORDER BY user_id, value_1;
WITH top10 AS (
SELECT user_id FROM raw_events_second WHERE value_1 IS NOT NULL ORDER BY value_1 LIMIT 10
)
INSERT INTO dist_table_with_user_sequence (value_1)
SELECT * FROM top10;
SELECT * FROM dist_table_with_user_sequence ORDER BY user_id, value_1;
-- router queries become logical planner queries when there is a nextval call
INSERT INTO dist_table_with_user_sequence (user_id)
SELECT user_id FROM dist_table_with_user_sequence WHERE user_id = 1;
SELECT * FROM dist_table_with_user_sequence ORDER BY user_id, value_1;
DROP TABLE dist_table_with_user_sequence;
DROP SEQUENCE seq1, seq2;
-- Select from distributed table into reference table
CREATE TABLE ref_table (user_id serial, value_1 int);
SELECT create_reference_table('ref_table');
@ -2037,6 +2095,27 @@ SELECT * FROM ref_table ORDER BY user_id, value_1;
DROP TABLE ref_table;
-- Select from distributed table into reference table with user-defined sequence
CREATE SEQUENCE seq1;
CREATE TABLE ref_table_with_user_sequence (user_id int default nextval('seq1'), value_1 int);
SELECT create_reference_table('ref_table_with_user_sequence');
INSERT INTO ref_table_with_user_sequence
SELECT user_id, value_1 FROM raw_events_second;
SELECT * FROM ref_table_with_user_sequence ORDER BY user_id, value_1;
INSERT INTO ref_table_with_user_sequence (value_1)
SELECT value_1 FROM raw_events_second ORDER BY value_1;
SELECT * FROM ref_table_with_user_sequence ORDER BY user_id, value_1;
INSERT INTO ref_table_with_user_sequence SELECT * FROM ref_table_with_user_sequence;
SELECT * FROM ref_table_with_user_sequence ORDER BY user_id, value_1;
DROP TABLE ref_table_with_user_sequence;
DROP SEQUENCE seq1;
-- Select from reference table into reference table
CREATE TABLE ref1 (d timestamptz);
SELECT create_reference_table('ref1');
@ -2269,6 +2348,8 @@ DROP TABLE reference_table;
DROP TABLE agg_events;
DROP TABLE table_with_defaults;
DROP TABLE table_with_serial;
DROP TABLE table_with_user_sequence;
DROP SEQUENCE user_defined_sequence;
DROP TABLE text_table;
DROP TABLE char_table;
DROP TABLE table_with_starts_with_defaults;

View File

@ -51,8 +51,9 @@ CREATE OR REPLACE FUNCTION pg_catalog.master_create_worker_shards(table_name tex
AS 'citus', $$master_create_worker_shards$$
LANGUAGE C STRICT;
-- Create a test table with constraints and SERIAL
CREATE TABLE mx_test_table (col_1 int UNIQUE, col_2 text NOT NULL, col_3 BIGSERIAL);
-- Create a test table with constraints and SERIAL and default from user defined sequence
CREATE SEQUENCE user_defined_seq;
CREATE TABLE mx_test_table (col_1 int UNIQUE, col_2 text NOT NULL, col_3 BIGSERIAL, col_4 BIGINT DEFAULT nextval('user_defined_seq'));
SELECT master_create_distributed_table('mx_test_table', 'col_1', 'hash');
SELECT master_create_worker_shards('mx_test_table', 8, 1);
@ -82,6 +83,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass;
SELECT unnest(master_metadata_snapshot()) order by 1;
-- Test start_metadata_sync_to_node UDF
-- Ensure that hasmetadata=false for all nodes
@ -760,6 +762,51 @@ SELECT pg_reload_conf();
UPDATE pg_dist_node SET metadatasynced=true WHERE nodeport=:worker_1_port;
SELECT master_add_node('localhost', :worker_2_port);
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
CREATE SEQUENCE mx_test_sequence_0;
CREATE SEQUENCE mx_test_sequence_1;
-- test create_distributed_table
CREATE TABLE test_table (id int DEFAULT nextval('mx_test_sequence_0'));
SELECT create_distributed_table('test_table', 'id');
-- shouldn't work since it's partition column
ALTER TABLE test_table ALTER COLUMN id SET DEFAULT nextval('mx_test_sequence_1');
-- test different plausible commands
ALTER TABLE test_table ADD COLUMN id2 int DEFAULT nextval('mx_test_sequence_1');
ALTER TABLE test_table ALTER COLUMN id2 DROP DEFAULT;
ALTER TABLE test_table ALTER COLUMN id2 SET DEFAULT nextval('mx_test_sequence_1');
SELECT unnest(master_metadata_snapshot()) order by 1;
-- shouldn't work since test_table is MX
ALTER TABLE test_table ADD COLUMN id3 bigserial;
-- shouldn't work since the above operations should be the only subcommands
ALTER TABLE test_table ADD COLUMN id4 int DEFAULT nextval('mx_test_sequence_1') CHECK (id4 > 0);
ALTER TABLE test_table ADD COLUMN id4 int, ADD COLUMN id5 int DEFAULT nextval('mx_test_sequence_1');
ALTER TABLE test_table ALTER COLUMN id1 SET DEFAULT nextval('mx_test_sequence_1'), ALTER COLUMN id2 DROP DEFAULT;
ALTER TABLE test_table ADD COLUMN id4 bigserial CHECK (id4 > 0);
\c - - - :worker_1_port
\ds
\c - - - :master_port
CREATE SEQUENCE local_sequence;
-- verify that DROP SEQUENCE will propagate the command to workers for
-- the distributed sequences mx_test_sequence_0 and mx_test_sequence_1
DROP SEQUENCE mx_test_sequence_0, mx_test_sequence_1, local_sequence CASCADE;
\c - - - :worker_1_port
\ds
\c - - - :master_port
DROP TABLE test_table CASCADE;
-- Cleanup
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);

View File

@ -61,6 +61,10 @@ SELECT create_reference_table('ref_table');
CREATE TABLE dist_table_1(a int primary key, b int references ref_table(a));
SELECT create_distributed_table('dist_table_1', 'a');
CREATE SEQUENCE sequence;
CREATE TABLE reference_table (a int default nextval('sequence'));
SELECT create_reference_table('reference_table');
-- update the node
SELECT 1 FROM master_update_node((SELECT nodeid FROM pg_dist_node),
'localhost', :worker_2_port);
@ -354,7 +358,9 @@ DROP DATABASE db_to_drop;
SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
-- cleanup
DROP SEQUENCE sequence CASCADE;
DROP TABLE ref_table;
DROP TABLE reference_table;
TRUNCATE pg_dist_colocation;
SELECT count(*) FROM (SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node) t;
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;

View File

@ -20,7 +20,7 @@ INSERT INTO source_table (a, b) VALUES (1, 5);
INSERT INTO source_table (a, b) VALUES (10, 10);
-- simulate actually having secondary nodes
SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node;
SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node ORDER BY 1, 2;
UPDATE pg_dist_node SET noderole = 'secondary';
\c "dbname=regression options='-c\ citus.use_secondary_nodes=always'"

View File

@ -10,7 +10,7 @@ SELECT create_distributed_table('source_table_xyz', 'key', 'range');
CALL public.create_range_partitioned_shards('source_table_xyz', '{"(0,a)","(25,z)"}','{"(24,a)","(49,z)"}');
SELECT * FROM pg_dist_shard WHERE logicalrelid='source_table_xyz'::regclass::oid ORDER BY shardid;
SELECT shardid, nodename, nodeport FROM pg_dist_shard_placement WHERE EXISTS(SELECT shardid FROM pg_dist_shard WHERE shardid=pg_dist_shard_placement.shardid AND logicalrelid='source_table_xyz'::regclass::oid);
SELECT shardid, nodename, nodeport FROM pg_dist_shard_placement WHERE EXISTS(SELECT shardid FROM pg_dist_shard WHERE shardid=pg_dist_shard_placement.shardid AND logicalrelid='source_table_xyz'::regclass::oid) ORDER BY 1, 2, 3;
INSERT INTO source_table_xyz VALUES ((0, 'a'), 1, (0, 'a')),
((1, 'b'), 2, (26, 'b')),

View File

@ -0,0 +1,215 @@
--
-- MULTI_SEQUENCE_DEFAULT
--
-- Tests related to column defaults coming from a sequence
--
SET citus.next_shard_id TO 890000;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
-- Cannot add a column involving DEFAULT nextval('..') because the table is not empty
CREATE SEQUENCE seq_0;
CREATE TABLE seq_test_0 (x int, y int);
SELECT create_distributed_table('seq_test_0','x');
INSERT INTO seq_test_0 SELECT 1, s FROM generate_series(1, 50) s;
ALTER TABLE seq_test_0 ADD COLUMN z int DEFAULT nextval('seq_0');
ALTER TABLE seq_test_0 ADD COLUMN z serial;
-- follow hint
ALTER TABLE seq_test_0 ADD COLUMN z int;
ALTER TABLE seq_test_0 ALTER COLUMN z SET DEFAULT nextval('seq_0');
SELECT * FROM seq_test_0 ORDER BY 1, 2 LIMIT 5;
\d seq_test_0
-- check that we can add serial pseudo-type columns
-- when metadata is not yet synced to workers
TRUNCATE seq_test_0;
ALTER TABLE seq_test_0 ADD COLUMN w00 smallserial;
ALTER TABLE seq_test_0 ADD COLUMN w01 serial2;
ALTER TABLE seq_test_0 ADD COLUMN w10 serial;
ALTER TABLE seq_test_0 ADD COLUMN w11 serial4;
ALTER TABLE seq_test_0 ADD COLUMN w20 bigserial;
ALTER TABLE seq_test_0 ADD COLUMN w21 serial8;
-- check alter column type precaution
ALTER TABLE seq_test_0 ALTER COLUMN z TYPE bigint;
ALTER TABLE seq_test_0 ALTER COLUMN z TYPE smallint;
-- MX tests
-- check that there's not problem with group ID cache
CREATE TABLE seq_test_4 (x int, y int);
SELECT create_distributed_table('seq_test_4','x');
CREATE SEQUENCE seq_4;
ALTER TABLE seq_test_4 ADD COLUMN a int DEFAULT nextval('seq_4');
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
DROP SEQUENCE seq_4 CASCADE;
TRUNCATE seq_test_4;
CREATE SEQUENCE seq_4;
ALTER TABLE seq_test_4 ADD COLUMN b int DEFAULT nextval('seq_4');
-- on worker it should generate high sequence number
\c - - - :worker_1_port
INSERT INTO seq_test_4 VALUES (1,2) RETURNING *;
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- check sequence type consistency in all nodes
CREATE SEQUENCE seq_1;
-- type is bigint by default
\d seq_1
CREATE TABLE seq_test_1 (x int, y int);
SELECT create_distributed_table('seq_test_1','x');
ALTER TABLE seq_test_1 ADD COLUMN z int DEFAULT nextval('seq_1');
-- type is changed to int
\d seq_1
-- check insertion is within int bounds in the worker
\c - - - :worker_1_port
INSERT INTO seq_test_1 values (1, 2) RETURNING *;
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- check that we cannot add serial pseudo-type columns
-- when metadata is synced to workers
ALTER TABLE seq_test_1 ADD COLUMN w bigserial;
-- check for sequence type clashes
CREATE SEQUENCE seq_2;
CREATE TABLE seq_test_2 (x int, y bigint DEFAULT nextval('seq_2'));
-- should work
SELECT create_distributed_table('seq_test_2','x');
DROP TABLE seq_test_2;
CREATE TABLE seq_test_2 (x int, y int DEFAULT nextval('seq_2'));
-- should work
SELECT create_distributed_table('seq_test_2','x');
CREATE TABLE seq_test_2_0(x int, y smallint DEFAULT nextval('seq_2'));
-- shouldn't work
SELECT create_distributed_table('seq_test_2_0','x');
DROP TABLE seq_test_2;
DROP TABLE seq_test_2_0;
-- should work
CREATE TABLE seq_test_2 (x int, y bigint DEFAULT nextval('seq_2'));
SELECT create_distributed_table('seq_test_2','x');
DROP TABLE seq_test_2;
CREATE TABLE seq_test_2 (x int, y int DEFAULT nextval('seq_2'), z bigint DEFAULT nextval('seq_2'));
-- shouldn't work
SELECT create_distributed_table('seq_test_2','x');
-- check rename is propagated properly
ALTER SEQUENCE seq_2 RENAME TO sequence_2;
-- check in the worker
\c - - - :worker_1_port
\d sequence_2
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- check rename with another schema
-- we notice that schema is also propagated as one of the sequence's dependencies
CREATE SCHEMA sequence_default_0;
SET search_path TO public, sequence_default_0;
CREATE SEQUENCE sequence_default_0.seq_3;
CREATE TABLE seq_test_3 (x int, y bigint DEFAULT nextval('seq_3'));
SELECT create_distributed_table('seq_test_3', 'x');
ALTER SEQUENCE sequence_default_0.seq_3 RENAME TO sequence_3;
-- check in the worker
\c - - - :worker_1_port
\d sequence_default_0.sequence_3
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
DROP SEQUENCE sequence_default_0.sequence_3 CASCADE;
DROP SCHEMA sequence_default_0;
-- DROP SCHEMA problem: expected since we don't propagate DROP SCHEMA
CREATE TABLE seq_test_5 (x int, y int);
SELECT create_distributed_table('seq_test_5','x');
CREATE SCHEMA sequence_default_1;
CREATE SEQUENCE sequence_default_1.seq_5;
ALTER TABLE seq_test_5 ADD COLUMN a int DEFAULT nextval('sequence_default_1.seq_5');
DROP SCHEMA sequence_default_1 CASCADE;
-- sequence is gone from coordinator
INSERT INTO seq_test_5 VALUES (1, 2) RETURNING *;
-- but is still present on worker
\c - - - :worker_1_port
INSERT INTO seq_test_5 VALUES (1, 2) RETURNING *;
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- apply workaround
SELECT run_command_on_workers('DROP SCHEMA sequence_default_1 CASCADE');
-- now the sequence is gone from the worker as well
\c - - - :worker_1_port
INSERT INTO seq_test_5 VALUES (1, 2) RETURNING *;
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- check some more complex cases
CREATE SEQUENCE seq_6;
CREATE TABLE seq_test_6 (x int, t timestamptz DEFAULT now(), s int DEFAULT nextval('seq_6'), m int) PARTITION BY RANGE (t);
SELECT create_distributed_table('seq_test_6','x');
-- shouldn't work since x is the partition column
ALTER TABLE seq_test_6 ALTER COLUMN x SET DEFAULT nextval('seq_6');
-- should work since both s and m have int type
ALTER TABLE seq_test_6 ALTER COLUMN m SET DEFAULT nextval('seq_6');
-- It is possible for a partition to have a different DEFAULT than its parent
CREATE SEQUENCE seq_7;
CREATE TABLE seq_test_7 (x text, s bigint DEFAULT nextval('seq_7'), t timestamptz DEFAULT now()) PARTITION BY RANGE (t);
SELECT create_distributed_table('seq_test_7','x');
CREATE SEQUENCE seq_7_par;
CREATE TABLE seq_test_7_par (x text, s bigint DEFAULT nextval('seq_7_par'), t timestamptz DEFAULT now());
ALTER TABLE seq_test_7 ATTACH PARTITION seq_test_7_par FOR VALUES FROM ('2021-05-31') TO ('2021-06-01');
-- check that both sequences are in worker
\c - - - :worker_1_port
\d seq_7
\d seq_7_par
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- Check that various ALTER SEQUENCE commands
-- are not allowed for a distributed sequence for now
CREATE SEQUENCE seq_8;
CREATE SCHEMA sequence_default_8;
-- can change schema in a sequence not yet distributed
ALTER SEQUENCE seq_8 SET SCHEMA sequence_default_8;
ALTER SEQUENCE sequence_default_8.seq_8 SET SCHEMA public;
CREATE TABLE seq_test_8 (x int, y int DEFAULT nextval('seq_8'));
SELECT create_distributed_table('seq_test_8', 'x');
-- cannot change sequence specifications
ALTER SEQUENCE seq_8 AS bigint;
ALTER SEQUENCE seq_8 INCREMENT BY 2;
ALTER SEQUENCE seq_8 MINVALUE 5 MAXVALUE 5000;
ALTER SEQUENCE seq_8 START WITH 6;
ALTER SEQUENCE seq_8 RESTART WITH 6;
ALTER SEQUENCE seq_8 NO CYCLE;
ALTER SEQUENCE seq_8 OWNED BY seq_test_7;
-- cannot change schema in a distributed sequence
ALTER SEQUENCE seq_8 SET SCHEMA sequence_default_8;
DROP SCHEMA sequence_default_8;
-- cannot use more than one sequence in a column default
CREATE SEQUENCE seq_9;
CREATE SEQUENCE seq_10;
CREATE TABLE seq_test_9 (x int, y int DEFAULT nextval('seq_9') - nextval('seq_10'));
SELECT create_distributed_table('seq_test_9', 'x');
-- clean up
DROP TABLE seq_test_0, seq_test_1, seq_test_2, seq_test_3, seq_test_4, seq_test_5, seq_test_6, seq_test_7, seq_test_8, seq_test_9;
DROP SEQUENCE seq_0, seq_1, sequence_2, seq_4, seq_6, seq_7, seq_7_par, seq_8, seq_9, seq_10;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);

View File

@ -2,6 +2,8 @@
-- MULTI_TABLE_DDL
--
-- Tests around changing the schema and dropping of a distributed table
-- Test DEFAULTS coming from SERIAL pseudo-types, user-defined sequences
--
SET citus.next_shard_id TO 870000;
@ -66,7 +68,7 @@ SET citus.shard_count TO 2;
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('testserialtable', 'group_id', 'hash');
-- should not be able to add additional serial columns
-- can add additional serial columns
ALTER TABLE testserialtable ADD COLUMN other_id serial;
-- and we shouldn't be able to change a distributed sequence's owner
@ -75,11 +77,16 @@ ALTER SEQUENCE testserialtable_id_seq OWNED BY NONE;
-- or create a sequence with a distributed owner
CREATE SEQUENCE standalone_sequence OWNED BY testserialtable.group_id;
-- EDIT: this doesn't error out for now in order to allow adding
-- new serial columns (they always come with owned_by command)
-- should be fixed later in ALTER SEQUENCE preprocessing
-- or even change a manual sequence to be owned by a distributed table
CREATE SEQUENCE standalone_sequence;
ALTER SEQUENCE standalone_sequence OWNED BY testserialtable.group_id;
-- an edge case, but it's OK to change an owner to the same distributed table
-- EDIT: this doesn't work for now for a distributed sequence
ALTER SEQUENCE testserialtable_id_seq OWNED BY testserialtable.id;
-- drop distributed table
@ -89,3 +96,32 @@ DROP TABLE testserialtable;
-- verify owned sequence is dropped
\c - - - :worker_1_port
\ds
\c - - - :master_port
-- test DEFAULT coming from SERIAL pseudo-types and user-defined sequences
CREATE SEQUENCE test_sequence_0;
CREATE SEQUENCE test_sequence_1;
CREATE TABLE test_table (id1 int DEFAULT nextval('test_sequence_0'));
SELECT create_distributed_table('test_table', 'id1');
-- shouldn't work since it's partition column
ALTER TABLE test_table ALTER COLUMN id1 SET DEFAULT nextval('test_sequence_1');
-- test different plausible commands
ALTER TABLE test_table ADD COLUMN id2 int DEFAULT nextval('test_sequence_1');
ALTER TABLE test_table ALTER COLUMN id2 DROP DEFAULT;
ALTER TABLE test_table ALTER COLUMN id2 SET DEFAULT nextval('test_sequence_1');
ALTER TABLE test_table ADD COLUMN id3 bigserial;
-- shouldn't work since the above operations should be the only subcommands
ALTER TABLE test_table ADD COLUMN id4 int DEFAULT nextval('test_sequence_1') CHECK (id4 > 0);
ALTER TABLE test_table ADD COLUMN id4 int, ADD COLUMN id5 int DEFAULT nextval('test_sequence_1');
ALTER TABLE test_table ALTER COLUMN id3 SET DEFAULT nextval('test_sequence_1'), ALTER COLUMN id2 DROP DEFAULT;
ALTER TABLE test_table ADD COLUMN id4 bigserial CHECK (id4 > 0);
DROP TABLE test_table CASCADE;
DROP SEQUENCE test_sequence_0;
DROP SEQUENCE test_sequence_1;