Identity Column Support on Citus Managed Tables (#6591)

DESCRIPTION: Identity Column Support on Citus Managed Tables
pull/6630/head^2
Gokhan Gulbiz 2023-01-19 15:45:41 +03:00 committed by GitHub
parent 64e3fee89b
commit 2388fbea6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1244 additions and 137 deletions

View File

@ -570,8 +570,11 @@ ConvertTable(TableConversionState *con)
char *newAccessMethod = con->accessMethod ? con->accessMethod :
con->originalAccessMethod;
IncludeSequenceDefaults includeSequenceDefaults = NEXTVAL_SEQUENCE_DEFAULTS;
IncludeIdentities includeIdentity = INCLUDE_IDENTITY;
List *preLoadCommands = GetPreLoadTableCreationCommands(con->relationId,
includeSequenceDefaults,
includeIdentity,
newAccessMethod);
if (con->accessMethod && strcmp(con->accessMethod, "columnar") == 0)
@ -1096,19 +1099,7 @@ CreateTableConversion(TableConversionParameters *params)
"because no such table exists")));
}
TupleDesc relationDesc = RelationGetDescr(relation);
if (RelationUsesIdentityColumns(relationDesc))
{
/*
* pg_get_tableschemadef_string doesn't know how to deparse identity
* columns so we cannot reflect those columns when creating table
* from scratch. For this reason, error out here.
*/
ereport(ERROR, (errmsg("cannot complete command because relation "
"%s has identity column",
generate_qualified_relation_name(con->relationId)),
errhint("Drop the identity columns and re-try the command")));
}
relation_close(relation, NoLock);
con->distributionKey =
BuildDistributionKeyFromColumnName(con->relationId, con->distributionColumn,
@ -1532,6 +1523,96 @@ CreateMaterializedViewDDLCommand(Oid matViewOid)
}
/*
* This function marks all the identity sequences as distributed on the given table.
*/
static void
MarkIdentitiesAsDistributed(Oid targetRelationId)
{
Relation relation = relation_open(targetRelationId, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(relation);
relation_close(relation, NoLock);
bool missingSequenceOk = false;
for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex);
if (attributeForm->attidentity)
{
Oid seqOid = getIdentitySequence(targetRelationId, attributeForm->attnum,
missingSequenceOk);
ObjectAddress seqAddress = { 0 };
ObjectAddressSet(seqAddress, RelationRelationId, seqOid);
MarkObjectDistributed(&seqAddress);
}
}
}
/*
* This function returns sql statements to rename identites on the given table
*/
static void
PrepareRenameIdentitiesCommands(Oid sourceRelationId, Oid targetRelationId,
List **outCoordinatorCommands, List **outWorkerCommands)
{
Relation targetRelation = relation_open(targetRelationId, AccessShareLock);
TupleDesc targetTupleDescriptor = RelationGetDescr(targetRelation);
relation_close(targetRelation, NoLock);
bool missingSequenceOk = false;
for (int attributeIndex = 0; attributeIndex < targetTupleDescriptor->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(targetTupleDescriptor,
attributeIndex);
if (attributeForm->attidentity)
{
char *columnName = NameStr(attributeForm->attname);
Oid targetSequenceOid = getIdentitySequence(targetRelationId,
attributeForm->attnum,
missingSequenceOk);
char *targetSequenceName = generate_relation_name(targetSequenceOid, NIL);
Oid sourceSequenceOid = getIdentitySequence(sourceRelationId,
attributeForm->attnum,
missingSequenceOk);
char *sourceSequenceName = generate_relation_name(sourceSequenceOid, NIL);
/* to rename sequence on the coordinator */
*outCoordinatorCommands = lappend(*outCoordinatorCommands, psprintf(
"SET citus.enable_ddl_propagation TO OFF; ALTER SEQUENCE %s RENAME TO %s; RESET citus.enable_ddl_propagation;",
quote_identifier(
targetSequenceName),
quote_identifier(
sourceSequenceName)));
/* update workers to use existing sequence and drop the new one generated by PG */
bool missingTableOk = true;
*outWorkerCommands = lappend(*outWorkerCommands,
GetAlterColumnWithNextvalDefaultCmd(
sourceSequenceOid, sourceRelationId,
columnName,
missingTableOk));
/* drop the sequence generated by identity column */
*outWorkerCommands = lappend(*outWorkerCommands, psprintf(
"DROP SEQUENCE IF EXISTS %s",
quote_identifier(
targetSequenceName)));
}
}
}
/*
* ReplaceTable replaces the source table with the target table.
* It moves all the rows of the source table to target table with INSERT SELECT.
@ -1580,7 +1661,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
*/
List *nonStoredColumnNameList = GetNonGeneratedStoredColumnNameList(sourceId);
char *insertColumnString = StringJoin(nonStoredColumnNameList, ',');
appendStringInfo(query, "INSERT INTO %s (%s) SELECT %s FROM %s",
appendStringInfo(query,
"INSERT INTO %s (%s) OVERRIDING SYSTEM VALUE SELECT %s FROM %s",
quote_qualified_identifier(schemaName, targetName),
insertColumnString, insertColumnString,
quote_qualified_identifier(schemaName, sourceName));
@ -1589,7 +1671,28 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
ExecuteQueryViaSPI(query->data, SPI_OK_INSERT);
}
List *ownedSequences = getOwnedSequences(sourceId);
/*
* Drop identity dependencies (sequences marked as DEPENDENCY_INTERNAL) on the workers
* to keep their states after the source table is dropped.
*/
List *ownedIdentitySequences = getOwnedSequences_internal(sourceId, 0,
DEPENDENCY_INTERNAL);
if (ownedIdentitySequences != NIL && ShouldSyncTableMetadata(sourceId))
{
char *qualifiedTableName = quote_qualified_identifier(schemaName, sourceName);
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.worker_drop_sequence_dependency(%s);",
quote_literal_cstr(qualifiedTableName));
SendCommandToWorkersWithMetadata(command->data);
}
/*
* Modify regular sequence dependencies (sequences marked as DEPENDENCY_AUTO)
*/
List *ownedSequences = getOwnedSequences_internal(sourceId, 0, DEPENDENCY_AUTO);
Oid sequenceOid = InvalidOid;
foreach_oid(sequenceOid, ownedSequences)
{
@ -1645,6 +1748,23 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
quote_qualified_identifier(schemaName, sourceName))));
}
/*
* We need to prepare rename identities commands before dropping the original table,
* otherwise we can't find the original names of the identity sequences.
* We prepare separate commands for the coordinator and the workers because:
* In the coordinator, we simply need to rename the identity sequences
* to their names on the old table, because right now the identity
* sequences have default names generated by Postgres with the creation of the new table
* In the workers, we have not dropped the original identity sequences,
* so what we do is we alter the columns and set their default to the
* original identity sequences, and after that we drop the new sequences.
*/
List *coordinatorCommandsToRenameIdentites = NIL;
List *workerCommandsToRenameIdentites = NIL;
PrepareRenameIdentitiesCommands(sourceId, targetId,
&coordinatorCommandsToRenameIdentites,
&workerCommandsToRenameIdentites);
resetStringInfo(query);
appendStringInfo(query, "DROP %sTABLE %s CASCADE",
IsForeignTable(sourceId) ? "FOREIGN " : "",
@ -1662,6 +1782,27 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
quote_qualified_identifier(schemaName, targetName),
quote_identifier(sourceName));
ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY);
char *coordinatorCommand = NULL;
foreach_ptr(coordinatorCommand, coordinatorCommandsToRenameIdentites)
{
ExecuteQueryViaSPI(coordinatorCommand, SPI_OK_UTILITY);
}
char *workerCommand = NULL;
foreach_ptr(workerCommand, workerCommandsToRenameIdentites)
{
SendCommandToWorkersWithMetadata(workerCommand);
}
/*
* To preserve identity sequences states in case of redistributing the table again,
* we don't drop them when we undistribute a table. To maintain consistency and
* avoid future problems if we redistribute the table, we want to apply all changes happening to
* the identity sequence in the coordinator to their corresponding sequences in the workers as well.
* That's why we have to mark identity sequences as distributed
*/
MarkIdentitiesAsDistributed(targetId);
}

View File

@ -60,7 +60,6 @@ static void citus_add_local_table_to_metadata_internal(Oid relationId,
static void ErrorIfAddingPartitionTableToMetadata(Oid relationId);
static void ErrorIfUnsupportedCreateCitusLocalTable(Relation relation);
static void ErrorIfUnsupportedCitusLocalTableKind(Oid relationId);
static void ErrorIfUnsupportedCitusLocalColumnDefinition(Relation relation);
static void NoticeIfAutoConvertingLocalTables(bool autoConverted, Oid relationId);
static CascadeOperationType GetCascadeTypeForCitusLocalTables(bool autoConverted);
static List * GetShellTableDDLEventsForCitusLocalTable(Oid relationId);
@ -82,6 +81,7 @@ static char * GetRenameShardTriggerCommand(Oid shardRelationId, char *triggerNam
static void DropRelationTruncateTriggers(Oid relationId);
static char * GetDropTriggerCommand(Oid relationId, char *triggerName);
static void DropViewsOnTable(Oid relationId);
static void DropIdentitiesOnTable(Oid relationId);
static List * GetRenameStatsCommandList(List *statsOidList, uint64 shardId);
static List * ReversedOidList(List *oidList);
static void AppendExplicitIndexIdsToList(Form_pg_index indexForm,
@ -338,6 +338,12 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve
char *relationName = get_rel_name(relationId);
Oid relationSchemaId = get_rel_namespace(relationId);
/*
* Drop identities before local shard conversion since the shell table owns
* identities
*/
DropIdentitiesOnTable(relationId);
/* below we convert relation with relationId to the shard relation */
uint64 shardId = ConvertLocalTableToShard(relationId);
@ -486,7 +492,6 @@ ErrorIfUnsupportedCreateCitusLocalTable(Relation relation)
ErrorIfCoordinatorNotAddedAsWorkerNode();
ErrorIfUnsupportedCitusLocalTableKind(relationId);
EnsureTableNotDistributed(relationId);
ErrorIfUnsupportedCitusLocalColumnDefinition(relation);
ErrorIfRelationHasUnsupportedTrigger(relationId);
/*
@ -549,30 +554,6 @@ ErrorIfUnsupportedCitusLocalTableKind(Oid relationId)
}
/*
* ErrorIfUnsupportedCitusLocalColumnDefinition errors out if given relation
* has unsupported column definition for citus local table creation.
*/
static void
ErrorIfUnsupportedCitusLocalColumnDefinition(Relation relation)
{
TupleDesc relationDesc = RelationGetDescr(relation);
if (RelationUsesIdentityColumns(relationDesc))
{
/*
* pg_get_tableschemadef_string doesn't know how to deparse identity
* columns so we cannot reflect those columns when creating shell
* relation. For this reason, error out here.
*/
Oid relationId = relation->rd_id;
ereport(ERROR, (errmsg("cannot add %s to citus metadata since table "
"has identity column",
generate_qualified_relation_name(relationId)),
errhint("Drop the identity columns and re-try the command")));
}
}
/*
* NoticeIfAutoConvertingLocalTables logs a NOTICE message to inform the user that we are
* automatically adding local tables to metadata. The user should know that this table
@ -666,10 +647,12 @@ GetShellTableDDLEventsForCitusLocalTable(Oid relationId)
* a sequence.
*/
IncludeSequenceDefaults includeSequenceDefaults = NEXTVAL_SEQUENCE_DEFAULTS;
IncludeIdentities includeIdentityDefaults = INCLUDE_IDENTITY;
bool creatingShellTableOnRemoteNode = false;
List *tableDDLCommands = GetFullTableCreationCommands(relationId,
includeSequenceDefaults,
includeIdentityDefaults,
creatingShellTableOnRemoteNode);
List *shellTableDDLEvents = NIL;
@ -1040,6 +1023,46 @@ GetDropTriggerCommand(Oid relationId, char *triggerName)
}
/*
* DropIdentitiesOnTable drops the identities that depend on the given relation.
*/
static void
DropIdentitiesOnTable(Oid relationId)
{
Relation relation = relation_open(relationId, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(relation);
relation_close(relation, NoLock);
for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex);
char *columnName = NameStr(attributeForm->attname);
if (attributeForm->attidentity)
{
char *tableName = get_rel_name(relationId);
char *schemaName = get_namespace_name(get_rel_namespace(relationId));
char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName);
StringInfo dropCommand = makeStringInfo();
appendStringInfo(dropCommand, "ALTER TABLE %s ALTER %s DROP IDENTITY",
qualifiedTableName,
columnName);
/*
* We need to disable/enable ddl propagation for this command, to prevent
* sending unnecessary ALTER COLUMN commands for partitions, to MX workers.
*/
ExecuteAndLogUtilityCommandList(list_make3(DISABLE_DDL_PROPAGATION,
dropCommand->data,
ENABLE_DDL_PROPAGATION));
}
}
}
/*
* DropViewsOnTable drops the views that depend on the given relation.
*/

View File

@ -1627,16 +1627,9 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
ErrorIfTableIsACatalogTable(relation);
/* verify target relation does not use identity columns */
if (RelationUsesIdentityColumns(relationDesc))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation: %s", relationName),
errdetail("Distributed relations must not use GENERATED "
"... AS IDENTITY.")));
}
/* verify target relation is not distributed by a generated columns */
/* verify target relation is not distributed by a generated stored column
*/
if (distributionMethod != DISTRIBUTE_BY_NONE &&
DistributionColumnUsesGeneratedStoredColumn(relationDesc, distributionColumn))
{

View File

@ -370,6 +370,7 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
bool creatingShellTableOnRemoteNode = true;
List *tableDDLCommands = GetFullTableCreationCommands(relationId,
WORKER_NEXTVAL_SEQUENCE_DEFAULTS,
INCLUDE_IDENTITY_AS_SEQUENCE_DEFAULTS,
creatingShellTableOnRemoteNode);
TableDDLCommand *tableDDLCommand = NULL;
foreach_ptr(tableDDLCommand, tableDDLCommands)

View File

@ -103,6 +103,8 @@ static List * InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
const char *commandString);
static bool AlterInvolvesPartitionColumn(AlterTableStmt *alterTableStatement,
AlterTableCmd *command);
static bool AlterColumnInvolvesIdentityColumn(AlterTableStmt *alterTableStatement,
AlterTableCmd *command);
static void ErrorIfUnsupportedAlterAddConstraintStmt(AlterTableStmt *alterTableStatement);
static List * CreateRightShardListForInterShardDDLTask(Oid rightRelationId,
Oid leftRelationId,
@ -114,8 +116,7 @@ static void SetInterShardDDLTaskRelationShardList(Task *task,
ShardInterval *leftShardInterval,
ShardInterval *rightShardInterval);
static Oid get_attrdef_oid(Oid relationId, AttrNumber attnum);
static char * GetAlterColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationId,
char *colname);
static char * GetAddColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationId,
char *colname, TypeName *typeName);
@ -1239,6 +1240,30 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
}
}
/*
* We check for ADD COLUMN .. GENERATED .. AS IDENTITY expr
* since it uses a sequence as an internal dependency
* we should deparse the statement
*/
constraint = NULL;
foreach_ptr(constraint, columnConstraints)
{
if (constraint->contype == CONSTR_IDENTITY)
{
deparseAT = true;
useInitialDDLCommandString = false;
/*
* Since we don't support constraints for AT_AddColumn
* we have to set is_not_null to true explicitly for identity columns
*/
ColumnDef *newColDef = copyObject(columnDefinition);
newColDef->constraints = NULL;
newColDef->is_not_null = true;
newCmd->def = (Node *) newColDef;
}
}
/*
* We check for ADD COLUMN .. SERIAL pseudo-type
* if that's the case, we should deparse the statement
@ -2369,6 +2394,34 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
}
}
}
/*
* We check for ADD COLUMN .. GENERATED AS IDENTITY expr
* since it uses a seqeunce as an internal dependency
*/
constraint = NULL;
foreach_ptr(constraint, columnConstraints)
{
if (constraint->contype == CONSTR_IDENTITY)
{
AttrNumber attnum = get_attnum(relationId,
columnDefinition->colname);
bool missing_ok = false;
Oid seqOid = getIdentitySequence(relationId, attnum, missing_ok);
if (ShouldSyncTableMetadata(relationId))
{
needMetadataSyncForNewSequences = true;
alterTableDefaultNextvalCmd =
GetAddColumnWithNextvalDefaultCmd(seqOid,
relationId,
columnDefinition
->colname,
columnDefinition
->typeName);
}
}
}
}
/*
* We check for ALTER COLUMN .. SET DEFAULT nextval('user_defined_seq')
@ -2390,8 +2443,9 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
if (ShouldSyncTableMetadata(relationId))
{
needMetadataSyncForNewSequences = true;
bool missingTableOk = false;
alterTableDefaultNextvalCmd = GetAlterColumnWithNextvalDefaultCmd(
seqOid, relationId, command->name);
seqOid, relationId, command->name, missingTableOk);
}
}
}
@ -2579,8 +2633,9 @@ get_attrdef_oid(Oid relationId, AttrNumber attnum)
* ALTER TABLE ALTER COLUMN .. SET DEFAULT nextval()
* If sequence type is not bigint, we use worker_nextval() instead of nextval().
*/
static char *
GetAlterColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationId, char *colname)
char *
GetAlterColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationId, char *colname, bool
missingTableOk)
{
char *qualifiedSequenceName = generate_qualified_relation_name(sequenceOid);
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
@ -2599,9 +2654,18 @@ GetAlterColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationId, char *colna
StringInfoData str = { 0 };
initStringInfo(&str);
appendStringInfo(&str, "ALTER TABLE %s ALTER COLUMN %s "
appendStringInfo(&str, "ALTER TABLE ");
if (missingTableOk)
{
appendStringInfo(&str, "IF EXISTS ");
}
appendStringInfo(&str, "%s ALTER COLUMN %s "
"SET DEFAULT %s(%s::regclass)",
qualifiedRelationName, colname,
qualifiedRelationName,
colname,
quote_qualified_identifier("pg_catalog", nextvalFunctionName),
quote_literal_cstr(qualifiedSequenceName));
@ -2971,6 +3035,26 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
}
}
Constraint *columnConstraint = NULL;
foreach_ptr(columnConstraint, column->constraints)
{
if (columnConstraint->contype == CONSTR_IDENTITY)
{
/*
* Currently we don't support backfilling the new identity column with default values
* if the table is not empty
*/
if (!TableEmpty(relationId))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"Cannot add an identity column because the table is not empty")));
}
}
}
List *columnConstraints = column->constraints;
Constraint *constraint = NULL;
@ -3067,10 +3151,23 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
"involving partition column")));
}
/*
* We check for ALTER COLUMN TYPE ...
* if the column is an identity column,
* changing the type of the column
* should not be allowed for now
*/
if (AlterColumnInvolvesIdentityColumn(alterTableStatement, command))
{
ereport(ERROR, (errmsg("cannot execute ALTER COLUMN command "
"involving identity column")));
}
/*
* We check for ALTER COLUMN TYPE ...
* if the column has default coming from a user-defined sequence
* changing the type of the column should not be allowed for now
* changing the type of the column
* should not be allowed for now
*/
AttrNumber attnum = get_attnum(relationId, command->name);
List *seqInfoList = NIL;
@ -3578,6 +3675,41 @@ SetInterShardDDLTaskRelationShardList(Task *task, ShardInterval *leftShardInterv
}
/*
* AlterColumnInvolvesIdentityColumn checks if the given alter column command
* involves relation's identity column.
*/
static bool
AlterColumnInvolvesIdentityColumn(AlterTableStmt *alterTableStatement,
AlterTableCmd *command)
{
bool involvesIdentityColumn = false;
char *alterColumnName = command->name;
LOCKMODE lockmode = AlterTableGetLockLevel(alterTableStatement->cmds);
Oid relationId = AlterTableLookupRelation(alterTableStatement, lockmode);
if (!OidIsValid(relationId))
{
return false;
}
HeapTuple tuple = SearchSysCacheAttName(relationId, alterColumnName);
if (HeapTupleIsValid(tuple))
{
Form_pg_attribute targetAttr = (Form_pg_attribute) GETSTRUCT(tuple);
if (targetAttr->attidentity)
{
involvesIdentityColumn = true;
}
ReleaseSysCache(tuple);
}
return involvesIdentityColumn;
}
/*
* AlterInvolvesPartitionColumn checks if the given alter table command
* involves relation's partition column.

View File

@ -73,6 +73,7 @@
#include "utils/relcache.h"
#include "utils/ruleutils.h"
#include "utils/syscache.h"
#include "commands/sequence.h"
static void deparse_index_columns(StringInfo buffer, List *indexParameterList,
@ -302,10 +303,16 @@ pg_get_sequencedef(Oid sequenceRelationId)
* DEFAULT clauses for columns getting their default values from a sequence.
* When it's WORKER_NEXTVAL_SEQUENCE_DEFAULTS, the function creates the DEFAULT
* clause using worker_nextval('sequence') and not nextval('sequence')
* When IncludeIdentities is NO_IDENTITY, the function does not include identity column
* specifications. When it's INCLUDE_IDENTITY_AS_SEQUENCE_DEFAULTS, the function
* uses sequences and set them as default values for identity columns by using exactly
* the same approach with worker_nextval('sequence') & nextval('sequence') logic
* desribed above. When it's INCLUDE_IDENTITY it creates GENERATED .. AS IDENTIY clauses.
*/
char *
pg_get_tableschemadef_string(Oid tableRelationId, IncludeSequenceDefaults
includeSequenceDefaults, char *accessMethod)
includeSequenceDefaults, IncludeIdentities
includeIdentityDefaults, char *accessMethod)
{
bool firstAttributePrinted = false;
AttrNumber defaultValueIndex = 0;
@ -390,6 +397,50 @@ pg_get_tableschemadef_string(Oid tableRelationId, IncludeSequenceDefaults
}
#endif
if (attributeForm->attidentity && includeIdentityDefaults)
{
bool missing_ok = false;
Oid seqOid = getIdentitySequence(RelationGetRelid(relation),
attributeForm->attnum, missing_ok);
char *sequenceName = generate_qualified_relation_name(seqOid);
if (includeIdentityDefaults == INCLUDE_IDENTITY_AS_SEQUENCE_DEFAULTS)
{
if (pg_get_sequencedef(seqOid)->seqtypid != INT8OID)
{
appendStringInfo(&buffer,
" DEFAULT worker_nextval(%s::regclass)",
quote_literal_cstr(sequenceName));
}
else
{
appendStringInfo(&buffer, " DEFAULT nextval(%s::regclass)",
quote_literal_cstr(sequenceName));
}
}
else if (includeIdentityDefaults == INCLUDE_IDENTITY)
{
Form_pg_sequence pgSequenceForm = pg_get_sequencedef(seqOid);
uint64 sequenceStart = nextval_internal(seqOid, false);
char *sequenceDef = psprintf(
" GENERATED %s AS IDENTITY (INCREMENT BY " INT64_FORMAT \
" MINVALUE " INT64_FORMAT " MAXVALUE "
INT64_FORMAT \
" START WITH " INT64_FORMAT " CACHE "
INT64_FORMAT " %sCYCLE)",
attributeForm->attidentity == ATTRIBUTE_IDENTITY_ALWAYS ?
"ALWAYS" : "BY DEFAULT",
pgSequenceForm->seqincrement,
pgSequenceForm->seqmin,
pgSequenceForm->seqmax, sequenceStart,
pgSequenceForm->seqcache,
pgSequenceForm->seqcycle ? "" : "NO ");
appendStringInfo(&buffer, "%s", sequenceDef);
}
}
/* if this column has a default value, append the default value */
if (attributeForm->atthasdef)
{

View File

@ -1635,7 +1635,8 @@ GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList,
attrdefResult = lappend_oid(attrdefResult, deprec->objid);
attrdefAttnumResult = lappend_int(attrdefAttnumResult, deprec->refobjsubid);
}
else if (deprec->deptype == DEPENDENCY_AUTO &&
else if ((deprec->deptype == DEPENDENCY_AUTO || deprec->deptype ==
DEPENDENCY_INTERNAL) &&
deprec->refobjsubid != 0 &&
deprec->classid == RelationRelationId &&
get_rel_relkind(deprec->objid) == RELKIND_SEQUENCE)
@ -2604,9 +2605,13 @@ CreateShellTableOnWorkers(Oid relationId)
List *commandList = list_make1(DISABLE_DDL_PROPAGATION);
IncludeSequenceDefaults includeSequenceDefaults = WORKER_NEXTVAL_SEQUENCE_DEFAULTS;
IncludeIdentities includeIdentityDefaults =
INCLUDE_IDENTITY_AS_SEQUENCE_DEFAULTS;
bool creatingShellTableOnRemoteNode = true;
List *tableDDLCommands = GetFullTableCreationCommands(relationId,
includeSequenceDefaults,
includeIdentityDefaults,
creatingShellTableOnRemoteNode);
TableDDLCommand *tableDDLCommand = NULL;

View File

@ -131,6 +131,7 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
text *relationName = PG_GETARG_TEXT_P(0);
Oid relationId = ResolveRelationId(relationName, false);
IncludeSequenceDefaults includeSequenceDefaults = NEXTVAL_SEQUENCE_DEFAULTS;
IncludeIdentities includeIdentityDefaults = INCLUDE_IDENTITY;
/* create a function context for cross-call persistence */
@ -144,6 +145,7 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
bool creatingShellTableOnRemoteNode = false;
List *tableDDLEventList = GetFullTableCreationCommands(relationId,
includeSequenceDefaults,
includeIdentityDefaults,
creatingShellTableOnRemoteNode);
tableDDLEventCell = list_head(tableDDLEventList);
ListCellAndListWrapper *wrapper = palloc0(sizeof(ListCellAndListWrapper));
@ -458,16 +460,23 @@ ResolveRelationId(text *relationName, bool missingOk)
* These DDL commands are all palloced; and include the table's schema
* definition, optional column storage and statistics definitions, and index
* constraint and trigger definitions.
* When IncludeIdentities is NO_IDENTITY, the function does not include identity column
* specifications. When it's INCLUDE_IDENTITY_AS_SEQUENCE_DEFAULTS, the function
* uses sequences and set them as default values for identity columns by using exactly
* the same approach with worker_nextval('sequence') & nextval('sequence') logic
* desribed above. When it's INCLUDE_IDENTITY it creates GENERATED .. AS IDENTIY clauses.
*/
List *
GetFullTableCreationCommands(Oid relationId,
IncludeSequenceDefaults includeSequenceDefaults,
IncludeIdentities includeIdentityDefaults,
bool creatingShellTableOnRemoteNode)
{
List *tableDDLEventList = NIL;
List *preLoadCreationCommandList =
GetPreLoadTableCreationCommands(relationId, includeSequenceDefaults, NULL);
GetPreLoadTableCreationCommands(relationId, includeSequenceDefaults,
includeIdentityDefaults, NULL);
tableDDLEventList = list_concat(tableDDLEventList, preLoadCreationCommandList);
@ -592,6 +601,7 @@ GetTableReplicaIdentityCommand(Oid relationId)
List *
GetPreLoadTableCreationCommands(Oid relationId,
IncludeSequenceDefaults includeSequenceDefaults,
IncludeIdentities includeIdentityDefaults,
char *accessMethod)
{
List *tableDDLEventList = NIL;
@ -601,6 +611,7 @@ GetPreLoadTableCreationCommands(Oid relationId,
/* fetch table schema and column option definitions */
char *tableSchemaDef = pg_get_tableschemadef_string(relationId,
includeSequenceDefaults,
includeIdentityDefaults,
accessMethod);
char *tableColumnOptionsDef = pg_get_tablecolumnoptionsdef_string(relationId);

View File

@ -708,6 +708,7 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
List *splitShardCreationCommandList = GetPreLoadTableCreationCommands(
shardInterval->relationId,
false, /* includeSequenceDefaults */
false, /* includeIdentityDefaults */
NULL /* auto add columnar options for cstore tables */);
splitShardCreationCommandList = WorkerApplyShardDDLCommandList(
splitShardCreationCommandList,
@ -1688,6 +1689,7 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
List *splitShardCreationCommandList = GetPreLoadTableCreationCommands(
shardInterval->relationId,
false, /* includeSequenceDefaults */
false, /* includeIdentityDefaults */
NULL /* auto add columnar options for cstore tables */);
splitShardCreationCommandList = WorkerApplyShardDDLCommandList(
splitShardCreationCommandList,
@ -1750,6 +1752,7 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
List *splitShardCreationCommandList = GetPreLoadTableCreationCommands(
shardInterval->relationId,
false, /* includeSequenceDefaults */
false, /* includeIdentityDefaults */
NULL /* auto add columnar options for cstore tables */);
splitShardCreationCommandList = WorkerApplyShardDDLCommandList(
splitShardCreationCommandList,

View File

@ -1945,6 +1945,7 @@ RecreateTableDDLCommandList(Oid relationId)
StringInfo dropCommand = makeStringInfo();
IncludeSequenceDefaults includeSequenceDefaults = NO_SEQUENCE_DEFAULTS;
IncludeIdentities includeIdentityDefaults = NO_IDENTITY;
/* build appropriate DROP command based on relation kind */
if (RegularTable(relationId))
@ -1967,6 +1968,7 @@ RecreateTableDDLCommandList(Oid relationId)
List *dropCommandList = list_make1(makeTableDDLCommandString(dropCommand->data));
List *createCommandList = GetPreLoadTableCreationCommands(relationId,
includeSequenceDefaults,
includeIdentityDefaults,
NULL);
List *recreateCommandList = list_concat(dropCommandList, createCommandList);

View File

@ -308,9 +308,12 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
List *foreignConstraintCommandList =
GetReferencingForeignConstaintCommands(relationId);
IncludeSequenceDefaults includeSequenceDefaults = NO_SEQUENCE_DEFAULTS;
IncludeIdentities includeIdentityDefaults = NO_IDENTITY;
bool creatingShellTableOnRemoteNode = false;
List *ddlCommandList = GetFullTableCreationCommands(relationId,
includeSequenceDefaults,
includeIdentityDefaults,
creatingShellTableOnRemoteNode);
uint32 connectionFlag = FOR_DDL;
char *relationOwner = TableOwner(relationId);
@ -420,9 +423,12 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
bool useExclusiveConnection, bool colocatedShard)
{
IncludeSequenceDefaults includeSequenceDefaults = NO_SEQUENCE_DEFAULTS;
IncludeIdentities includeIdentityDefaults = NO_IDENTITY;
bool creatingShellTableOnRemoteNode = false;
List *ddlCommandList = GetFullTableCreationCommands(distributedRelationId,
includeSequenceDefaults,
includeIdentityDefaults,
creatingShellTableOnRemoteNode);
List *foreignConstraintCommandList =
GetReferencingForeignConstaintCommands(distributedRelationId);

View File

@ -254,6 +254,7 @@ ShouldEvaluateExpression(Expr *expression)
case T_RowCompareExpr:
case T_RelabelType:
case T_CoerceToDomain:
case T_NextValueExpr:
{
return true;
}

View File

@ -29,6 +29,7 @@ extern char * pg_get_sequencedef_string(Oid sequenceRelid);
extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId);
extern char * pg_get_tableschemadef_string(Oid tableRelationId,
IncludeSequenceDefaults includeSequenceDefaults,
IncludeIdentities includeIdentityDefaults,
char *accessMethod);
extern void EnsureRelationKindSupported(Oid relationId);
extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId);

View File

@ -563,7 +563,8 @@ extern List * MakeNameListFromRangeVar(const RangeVar *rel);
extern Oid GetSequenceOid(Oid relationId, AttrNumber attnum);
extern bool ConstrTypeUsesIndex(ConstrType constrType);
extern bool ConstrTypeCitusCanDefaultName(ConstrType constrType);
extern char * GetAlterColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationId,
char *colname, bool missingTableOk);
/* text_search.c - forward declarations */
extern List * GetCreateTextSearchConfigStatements(const ObjectAddress *address);

View File

@ -117,6 +117,18 @@ typedef enum IncludeSequenceDefaults
} IncludeSequenceDefaults;
/*
* IncludeIdentities decides on how we include identity information
* when creating the definition of a table.
*/
typedef enum IncludeIdentities
{
NO_IDENTITY = 0, /* don't include identities */
INCLUDE_IDENTITY_AS_SEQUENCE_DEFAULTS = 1, /* include identities as sequences */
INCLUDE_IDENTITY = 2 /* include identities as-is*/
} IncludeIdentities;
struct TableDDLCommand;
typedef struct TableDDLCommand TableDDLCommand;
typedef char *(*TableDDLFunction)(void *context);
@ -213,11 +225,14 @@ extern uint64 GetNextPlacementId(void);
extern Oid ResolveRelationId(text *relationName, bool missingOk);
extern List * GetFullTableCreationCommands(Oid relationId,
IncludeSequenceDefaults includeSequenceDefaults,
IncludeIdentities includeIdentityDefaults,
bool creatingShellTableOnRemoteNode);
extern List * GetPostLoadTableCreationCommands(Oid relationId, bool includeIndexes,
bool includeReplicaIdentity);
extern List * GetPreLoadTableCreationCommands(Oid relationId, IncludeSequenceDefaults
includeSequenceDefaults,
IncludeIdentities
includeIdentityDefaults,
char *accessMethod);
extern List * GetTableRowLevelSecurityCommands(Oid relationId);
extern List * GetTableIndexAndConstraintCommands(Oid relationId, int indexFlags);

View File

@ -1075,9 +1075,9 @@ NOTICE: creating a new table for alter_distributed_table.abcde_0123456789012345
NOTICE: moving the data of alter_distributed_table.abcde_012345678901234567890123456789012345678901234567890123456
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
CONTEXT: SQL statement "INSERT INTO alter_distributed_table.abcde_0123456789012345678901234567890123456_f7ff6612_4160710162 (x,y) SELECT x,y FROM alter_distributed_table.abcde_012345678901234567890123456789012345678901234567890123456"
CONTEXT: SQL statement "INSERT INTO alter_distributed_table.abcde_0123456789012345678901234567890123456_f7ff6612_4160710162 (x,y) OVERRIDING SYSTEM VALUE SELECT x,y FROM alter_distributed_table.abcde_012345678901234567890123456789012345678901234567890123456"
DEBUG: performing repartitioned INSERT ... SELECT
CONTEXT: SQL statement "INSERT INTO alter_distributed_table.abcde_0123456789012345678901234567890123456_f7ff6612_4160710162 (x,y) SELECT x,y FROM alter_distributed_table.abcde_012345678901234567890123456789012345678901234567890123456"
CONTEXT: SQL statement "INSERT INTO alter_distributed_table.abcde_0123456789012345678901234567890123456_f7ff6612_4160710162 (x,y) OVERRIDING SYSTEM VALUE SELECT x,y FROM alter_distributed_table.abcde_012345678901234567890123456789012345678901234567890123456"
NOTICE: dropping the old alter_distributed_table.abcde_012345678901234567890123456789012345678901234567890123456
CONTEXT: SQL statement "DROP TABLE alter_distributed_table.abcde_012345678901234567890123456789012345678901234567890123456 CASCADE"
NOTICE: renaming the new table to alter_distributed_table.abcde_012345678901234567890123456789012345678901234567890123456

View File

@ -733,11 +733,6 @@ SELECT relname, relkind
v_ref | v
(6 rows)
CREATE TABLE identity_cols_test (a int, b int generated by default as identity (increment by 42));
-- errors out since we don't support alter_table.* udfs with tables having any identity columns
SELECT alter_table_set_access_method('identity_cols_test', 'columnar');
ERROR: cannot complete command because relation alter_table_set_access_method.identity_cols_test has identity column
HINT: Drop the identity columns and re-try the command
-- test long table names
SET client_min_messages TO DEBUG1;
CREATE TABLE abcde_0123456789012345678901234567890123456789012345678901234567890123456789 (x int, y int);

View File

@ -62,18 +62,6 @@ BEGIN;
SELECT citus_add_local_table_to_metadata('temp_table');
ERROR: constraints on temporary tables may reference only temporary tables
ROLLBACK;
-- below two errors out since we don't support adding local tables
-- having any identity columns to metadata
BEGIN;
CREATE TABLE identity_cols_test (a int generated by default as identity (start with 42));
SELECT citus_add_local_table_to_metadata('identity_cols_test');
ERROR: cannot add citus_local_tables_test_schema.identity_cols_test to citus metadata since table has identity column
ROLLBACK;
BEGIN;
CREATE TABLE identity_cols_test (a int generated always as identity (increment by 42));
SELECT citus_add_local_table_to_metadata('identity_cols_test');
ERROR: cannot add citus_local_tables_test_schema.identity_cols_test to citus metadata since table has identity column
ROLLBACK;
-- creating citus local table having no data initially would work
SELECT citus_add_local_table_to_metadata('citus_local_table_1');
citus_add_local_table_to_metadata

View File

@ -0,0 +1,525 @@
CREATE SCHEMA generated_identities;
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SELECT 1 from citus_add_node('localhost', :master_port, groupId=>0);
?column?
---------------------------------------------------------------------
1
(1 row)
DROP TABLE IF EXISTS generated_identities_test;
-- create a partitioned table for testing.
CREATE TABLE generated_identities_test (
a int CONSTRAINT myconname GENERATED BY DEFAULT AS IDENTITY,
b bigint GENERATED ALWAYS AS IDENTITY (START WITH 10 INCREMENT BY 10),
c smallint GENERATED BY DEFAULT AS IDENTITY,
d serial,
e bigserial,
f smallserial,
g int
)
PARTITION BY RANGE (a);
CREATE TABLE generated_identities_test_1_5 PARTITION OF generated_identities_test FOR VALUES FROM (1) TO (5);
CREATE TABLE generated_identities_test_5_50 PARTITION OF generated_identities_test FOR VALUES FROM (5) TO (50);
-- local tables
SELECT citus_add_local_table_to_metadata('generated_identities_test');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
\d generated_identities_test
Partitioned table "generated_identities.generated_identities_test"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | integer | | not null | generated by default as identity
b | bigint | | not null | generated always as identity
c | smallint | | not null | generated by default as identity
d | integer | | not null | nextval('generated_identities_test_d_seq'::regclass)
e | bigint | | not null | nextval('generated_identities_test_e_seq'::regclass)
f | smallint | | not null | nextval('generated_identities_test_f_seq'::regclass)
g | integer | | |
Partition key: RANGE (a)
Number of partitions: 2 (Use \d+ to list them.)
\c - - - :worker_1_port
\d generated_identities.generated_identities_test
Partitioned table "generated_identities.generated_identities_test"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | integer | | not null | worker_nextval('generated_identities.generated_identities_test_a_seq'::regclass)
b | bigint | | not null | nextval('generated_identities.generated_identities_test_b_seq'::regclass)
c | smallint | | not null | worker_nextval('generated_identities.generated_identities_test_c_seq'::regclass)
d | integer | | not null | worker_nextval('generated_identities.generated_identities_test_d_seq'::regclass)
e | bigint | | not null | nextval('generated_identities.generated_identities_test_e_seq'::regclass)
f | smallint | | not null | worker_nextval('generated_identities.generated_identities_test_f_seq'::regclass)
g | integer | | |
Partition key: RANGE (a)
Number of partitions: 2 (Use \d+ to list them.)
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SELECT undistribute_table('generated_identities_test');
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT citus_remove_node('localhost', :master_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('generated_identities_test', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\d generated_identities_test
Partitioned table "generated_identities.generated_identities_test"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | integer | | not null | generated by default as identity
b | bigint | | not null | generated always as identity
c | smallint | | not null | generated by default as identity
d | integer | | not null | nextval('generated_identities_test_d_seq'::regclass)
e | bigint | | not null | nextval('generated_identities_test_e_seq'::regclass)
f | smallint | | not null | nextval('generated_identities_test_f_seq'::regclass)
g | integer | | |
Partition key: RANGE (a)
Number of partitions: 2 (Use \d+ to list them.)
\c - - - :worker_1_port
\d generated_identities.generated_identities_test
Partitioned table "generated_identities.generated_identities_test"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | integer | | not null | worker_nextval('generated_identities.generated_identities_test_a_seq'::regclass)
b | bigint | | not null | nextval('generated_identities.generated_identities_test_b_seq'::regclass)
c | smallint | | not null | worker_nextval('generated_identities.generated_identities_test_c_seq'::regclass)
d | integer | | not null | worker_nextval('generated_identities.generated_identities_test_d_seq'::regclass)
e | bigint | | not null | nextval('generated_identities.generated_identities_test_e_seq'::regclass)
f | smallint | | not null | worker_nextval('generated_identities.generated_identities_test_f_seq'::regclass)
g | integer | | |
Partition key: RANGE (a)
Number of partitions: 2 (Use \d+ to list them.)
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
insert into generated_identities_test (g) values (1);
insert into generated_identities_test (g) SELECT 2;
INSERT INTO generated_identities_test (g)
SELECT s FROM generate_series(3,7) s;
SELECT * FROM generated_identities_test ORDER BY 1;
a | b | c | d | e | f | g
---------------------------------------------------------------------
1 | 10 | 1 | 1 | 1 | 1 | 1
2 | 20 | 2 | 2 | 2 | 2 | 2
3 | 30 | 3 | 3 | 3 | 3 | 3
4 | 40 | 4 | 4 | 4 | 4 | 4
5 | 50 | 5 | 5 | 5 | 5 | 5
6 | 60 | 6 | 6 | 6 | 6 | 6
7 | 70 | 7 | 7 | 7 | 7 | 7
(7 rows)
SELECT undistribute_table('generated_identities_test');
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT * FROM generated_identities_test ORDER BY 1;
a | b | c | d | e | f | g
---------------------------------------------------------------------
1 | 10 | 1 | 1 | 1 | 1 | 1
2 | 20 | 2 | 2 | 2 | 2 | 2
3 | 30 | 3 | 3 | 3 | 3 | 3
4 | 40 | 4 | 4 | 4 | 4 | 4
5 | 50 | 5 | 5 | 5 | 5 | 5
6 | 60 | 6 | 6 | 6 | 6 | 6
7 | 70 | 7 | 7 | 7 | 7 | 7
(7 rows)
\d generated_identities_test
Partitioned table "generated_identities.generated_identities_test"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | integer | | not null | generated by default as identity
b | bigint | | not null | generated always as identity
c | smallint | | not null | generated by default as identity
d | integer | | not null | nextval('generated_identities_test_d_seq'::regclass)
e | bigint | | not null | nextval('generated_identities_test_e_seq'::regclass)
f | smallint | | not null | nextval('generated_identities_test_f_seq'::regclass)
g | integer | | |
Partition key: RANGE (a)
Number of partitions: 2 (Use \d+ to list them.)
\c - - - :worker_1_port
\d generated_identities.generated_identities_test
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO generated_identities_test (g)
SELECT s FROM generate_series(8,10) s;
SELECT * FROM generated_identities_test ORDER BY 1;
a | b | c | d | e | f | g
---------------------------------------------------------------------
1 | 10 | 1 | 1 | 1 | 1 | 1
2 | 20 | 2 | 2 | 2 | 2 | 2
3 | 30 | 3 | 3 | 3 | 3 | 3
4 | 40 | 4 | 4 | 4 | 4 | 4
5 | 50 | 5 | 5 | 5 | 5 | 5
6 | 60 | 6 | 6 | 6 | 6 | 6
7 | 70 | 7 | 7 | 7 | 7 | 7
8 | 80 | 8 | 8 | 8 | 8 | 8
9 | 90 | 9 | 9 | 9 | 9 | 9
10 | 100 | 10 | 10 | 10 | 10 | 10
(10 rows)
-- distributed table
SELECT create_distributed_table('generated_identities_test', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- alter table .. alter column .. add is unsupported
ALTER TABLE generated_identities_test ALTER COLUMN g ADD GENERATED ALWAYS AS IDENTITY;
ERROR: alter table command is currently unsupported
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP|VALIDATE CONSTRAINT, SET (), RESET (), ENABLE|DISABLE|NO FORCE|FORCE ROW LEVEL SECURITY, ATTACH|DETACH PARTITION and TYPE subcommands are supported.
-- alter table .. alter column is unsupported
ALTER TABLE generated_identities_test ALTER COLUMN b TYPE int;
ERROR: cannot execute ALTER COLUMN command involving identity column
SELECT alter_distributed_table('generated_identities_test', 'g');
alter_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT alter_distributed_table('generated_identities_test', 'b');
alter_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT alter_distributed_table('generated_identities_test', 'c');
alter_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT undistribute_table('generated_identities_test');
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT * FROM generated_identities_test ORDER BY g;
a | b | c | d | e | f | g
---------------------------------------------------------------------
1 | 10 | 1 | 1 | 1 | 1 | 1
2 | 20 | 2 | 2 | 2 | 2 | 2
3 | 30 | 3 | 3 | 3 | 3 | 3
4 | 40 | 4 | 4 | 4 | 4 | 4
5 | 50 | 5 | 5 | 5 | 5 | 5
6 | 60 | 6 | 6 | 6 | 6 | 6
7 | 70 | 7 | 7 | 7 | 7 | 7
8 | 80 | 8 | 8 | 8 | 8 | 8
9 | 90 | 9 | 9 | 9 | 9 | 9
10 | 100 | 10 | 10 | 10 | 10 | 10
(10 rows)
-- reference table
DROP TABLE generated_identities_test;
CREATE TABLE generated_identities_test (
a int GENERATED BY DEFAULT AS IDENTITY,
b bigint GENERATED ALWAYS AS IDENTITY (START WITH 10 INCREMENT BY 10),
c smallint GENERATED BY DEFAULT AS IDENTITY,
d serial,
e bigserial,
f smallserial,
g int
);
SELECT create_reference_table('generated_identities_test');
create_reference_table
---------------------------------------------------------------------
(1 row)
\d generated_identities_test
Table "generated_identities.generated_identities_test"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | integer | | not null | generated by default as identity
b | bigint | | not null | generated always as identity
c | smallint | | not null | generated by default as identity
d | integer | | not null | nextval('generated_identities_test_d_seq'::regclass)
e | bigint | | not null | nextval('generated_identities_test_e_seq'::regclass)
f | smallint | | not null | nextval('generated_identities_test_f_seq'::regclass)
g | integer | | |
\c - - - :worker_1_port
\d generated_identities.generated_identities_test
Table "generated_identities.generated_identities_test"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | integer | | not null | worker_nextval('generated_identities.generated_identities_test_a_seq'::regclass)
b | bigint | | not null | nextval('generated_identities.generated_identities_test_b_seq'::regclass)
c | smallint | | not null | worker_nextval('generated_identities.generated_identities_test_c_seq'::regclass)
d | integer | | not null | worker_nextval('generated_identities.generated_identities_test_d_seq'::regclass)
e | bigint | | not null | nextval('generated_identities.generated_identities_test_e_seq'::regclass)
f | smallint | | not null | worker_nextval('generated_identities.generated_identities_test_f_seq'::regclass)
g | integer | | |
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO generated_identities_test (g)
SELECT s FROM generate_series(11,20) s;
SELECT * FROM generated_identities_test ORDER BY g;
a | b | c | d | e | f | g
---------------------------------------------------------------------
1 | 10 | 1 | 1 | 1 | 1 | 11
2 | 20 | 2 | 2 | 2 | 2 | 12
3 | 30 | 3 | 3 | 3 | 3 | 13
4 | 40 | 4 | 4 | 4 | 4 | 14
5 | 50 | 5 | 5 | 5 | 5 | 15
6 | 60 | 6 | 6 | 6 | 6 | 16
7 | 70 | 7 | 7 | 7 | 7 | 17
8 | 80 | 8 | 8 | 8 | 8 | 18
9 | 90 | 9 | 9 | 9 | 9 | 19
10 | 100 | 10 | 10 | 10 | 10 | 20
(10 rows)
SELECT undistribute_table('generated_identities_test');
undistribute_table
---------------------------------------------------------------------
(1 row)
\d generated_identities_test
Table "generated_identities.generated_identities_test"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | integer | | not null | generated by default as identity
b | bigint | | not null | generated always as identity
c | smallint | | not null | generated by default as identity
d | integer | | not null | nextval('generated_identities_test_d_seq'::regclass)
e | bigint | | not null | nextval('generated_identities_test_e_seq'::regclass)
f | smallint | | not null | nextval('generated_identities_test_f_seq'::regclass)
g | integer | | |
\c - - - :worker_1_port
\d generated_identities.generated_identities_test
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
-- alter table .. add column .. GENERATED .. AS IDENTITY
DROP TABLE IF EXISTS color;
CREATE TABLE color (
color_name VARCHAR NOT NULL
);
SELECT create_distributed_table('color', 'color_name');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER TABLE color ADD COLUMN color_id BIGINT GENERATED ALWAYS AS IDENTITY;
INSERT INTO color(color_name) VALUES ('Red');
ALTER TABLE color ADD COLUMN color_id_1 BIGINT GENERATED ALWAYS AS IDENTITY;
ERROR: Cannot add an identity column because the table is not empty
DROP TABLE color;
-- insert data from workers
CREATE TABLE color (
color_id BIGINT GENERATED ALWAYS AS IDENTITY UNIQUE,
color_name VARCHAR NOT NULL
);
SELECT create_distributed_table('color', 'color_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SELECT undistribute_table('color');
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('color', 'color_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO color(color_name) VALUES ('Red');
SELECT count(*) from color;
count
---------------------------------------------------------------------
3
(1 row)
-- modify sequence & alter table
DROP TABLE color;
CREATE TABLE color (
color_id BIGINT GENERATED ALWAYS AS IDENTITY UNIQUE,
color_name VARCHAR NOT NULL
);
SELECT create_distributed_table('color', 'color_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SELECT undistribute_table('color');
undistribute_table
---------------------------------------------------------------------
(1 row)
ALTER SEQUENCE color_color_id_seq RENAME TO myseq;
SELECT create_distributed_table('color', 'color_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\ds+ myseq
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
generated_identities | myseq | sequence | postgres | permanent | 8192 bytes |
(1 row)
\ds+ color_color_id_seq
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
(0 rows)
\d color
Table "generated_identities.color"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
color_id | bigint | | not null | generated always as identity
color_name | character varying | | not null |
Indexes:
"color_color_id_key" UNIQUE CONSTRAINT, btree (color_id)
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
\ds+ myseq
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
generated_identities | myseq | sequence | postgres | permanent | 8192 bytes |
(1 row)
\ds+ color_color_id_seq
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
(0 rows)
\d color
Table "generated_identities.color"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
color_id | bigint | | not null | nextval('myseq'::regclass)
color_name | character varying | | not null |
Indexes:
"color_color_id_key" UNIQUE CONSTRAINT, btree (color_id)
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
ALTER SEQUENCE myseq RENAME TO color_color_id_seq;
\ds+ myseq
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
(0 rows)
\ds+ color_color_id_seq
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
generated_identities | color_color_id_seq | sequence | postgres | permanent | 8192 bytes |
(1 row)
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
\ds+ myseq
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
(0 rows)
\ds+ color_color_id_seq
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
generated_identities | color_color_id_seq | sequence | postgres | permanent | 8192 bytes |
(1 row)
\d color
Table "generated_identities.color"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
color_id | bigint | | not null | nextval('color_color_id_seq'::regclass)
color_name | character varying | | not null |
Indexes:
"color_color_id_key" UNIQUE CONSTRAINT, btree (color_id)
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SELECT alter_distributed_table('co23423lor', shard_count := 6);
ERROR: relation "co23423lor" does not exist
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
\ds+ color_color_id_seq
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
generated_identities | color_color_id_seq | sequence | postgres | permanent | 8192 bytes |
(1 row)
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
DROP SCHEMA generated_identities CASCADE;

View File

@ -1,20 +0,0 @@
--
-- MULTI_CREATE_TABLE_NEW_FEATURES
--
-- Verify that the GENERATED ... AS IDENTITY feature in PostgreSQL 10
-- is forbidden in distributed tables.
CREATE TABLE table_identity_col (
id integer GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
payload text );
SELECT create_distributed_table('table_identity_col', 'id', 'append');
ERROR: cannot distribute relation: table_identity_col
DETAIL: Distributed relations must not use GENERATED ... AS IDENTITY.
SELECT create_distributed_table('table_identity_col', 'id');
ERROR: cannot distribute relation: table_identity_col
DETAIL: Distributed relations must not use GENERATED ... AS IDENTITY.
SELECT create_distributed_table('table_identity_col', 'payload');
ERROR: cannot distribute relation: table_identity_col
DETAIL: Distributed relations must not use GENERATED ... AS IDENTITY.
SELECT create_reference_table('table_identity_col');
ERROR: cannot distribute relation: table_identity_col
DETAIL: Distributed relations must not use GENERATED ... AS IDENTITY.

View File

@ -68,7 +68,7 @@ test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_
test: multi_shard_update_delete recursive_dml_with_different_planners_executors
test: insert_select_repartition window_functions dml_recursive multi_insert_select_window
test: multi_insert_select_conflict citus_table_triggers
test: multi_row_insert insert_select_into_local_table multi_create_table_new_features alter_index
test: multi_row_insert insert_select_into_local_table alter_index
# following should not run in parallel because it relies on connection counts to workers
test: insert_select_connection_leak

View File

@ -62,7 +62,6 @@ test: multi_mx_alter_distributed_table
test: update_colocation_mx
test: resync_metadata_with_sequences
test: distributed_locks
# should be executed sequentially because it modifies metadata
test: local_shard_execution_dropped_column
test: metadata_sync_helpers

View File

@ -16,7 +16,7 @@ test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_
test: multi_shard_update_delete recursive_dml_with_different_planners_executors
test: insert_select_repartition window_functions dml_recursive multi_insert_select_window
test: multi_insert_select_conflict citus_table_triggers
test: multi_row_insert insert_select_into_local_table multi_create_table_new_features
test: multi_row_insert insert_select_into_local_table
test: multi_agg_approximate_distinct
test: tablespace
@ -120,3 +120,4 @@ test: ensure_no_intermediate_data_leak
test: ensure_no_shared_connection_leak
test: check_mx
test: generated_identity

View File

@ -246,10 +246,6 @@ SELECT relname, relkind
)
ORDER BY relname ASC;
CREATE TABLE identity_cols_test (a int, b int generated by default as identity (increment by 42));
-- errors out since we don't support alter_table.* udfs with tables having any identity columns
SELECT alter_table_set_access_method('identity_cols_test', 'columnar');
-- test long table names
SET client_min_messages TO DEBUG1;
CREATE TABLE abcde_0123456789012345678901234567890123456789012345678901234567890123456789 (x int, y int);

View File

@ -52,18 +52,6 @@ BEGIN;
SELECT citus_add_local_table_to_metadata('temp_table');
ROLLBACK;
-- below two errors out since we don't support adding local tables
-- having any identity columns to metadata
BEGIN;
CREATE TABLE identity_cols_test (a int generated by default as identity (start with 42));
SELECT citus_add_local_table_to_metadata('identity_cols_test');
ROLLBACK;
BEGIN;
CREATE TABLE identity_cols_test (a int generated always as identity (increment by 42));
SELECT citus_add_local_table_to_metadata('identity_cols_test');
ROLLBACK;
-- creating citus local table having no data initially would work
SELECT citus_add_local_table_to_metadata('citus_local_table_1');

View File

@ -0,0 +1,266 @@
CREATE SCHEMA generated_identities;
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SELECT 1 from citus_add_node('localhost', :master_port, groupId=>0);
DROP TABLE IF EXISTS generated_identities_test;
-- create a partitioned table for testing.
CREATE TABLE generated_identities_test (
a int CONSTRAINT myconname GENERATED BY DEFAULT AS IDENTITY,
b bigint GENERATED ALWAYS AS IDENTITY (START WITH 10 INCREMENT BY 10),
c smallint GENERATED BY DEFAULT AS IDENTITY,
d serial,
e bigserial,
f smallserial,
g int
)
PARTITION BY RANGE (a);
CREATE TABLE generated_identities_test_1_5 PARTITION OF generated_identities_test FOR VALUES FROM (1) TO (5);
CREATE TABLE generated_identities_test_5_50 PARTITION OF generated_identities_test FOR VALUES FROM (5) TO (50);
-- local tables
SELECT citus_add_local_table_to_metadata('generated_identities_test');
\d generated_identities_test
\c - - - :worker_1_port
\d generated_identities.generated_identities_test
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SELECT undistribute_table('generated_identities_test');
SELECT citus_remove_node('localhost', :master_port);
SELECT create_distributed_table('generated_identities_test', 'a');
\d generated_identities_test
\c - - - :worker_1_port
\d generated_identities.generated_identities_test
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
insert into generated_identities_test (g) values (1);
insert into generated_identities_test (g) SELECT 2;
INSERT INTO generated_identities_test (g)
SELECT s FROM generate_series(3,7) s;
SELECT * FROM generated_identities_test ORDER BY 1;
SELECT undistribute_table('generated_identities_test');
SELECT * FROM generated_identities_test ORDER BY 1;
\d generated_identities_test
\c - - - :worker_1_port
\d generated_identities.generated_identities_test
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO generated_identities_test (g)
SELECT s FROM generate_series(8,10) s;
SELECT * FROM generated_identities_test ORDER BY 1;
-- distributed table
SELECT create_distributed_table('generated_identities_test', 'a');
-- alter table .. alter column .. add is unsupported
ALTER TABLE generated_identities_test ALTER COLUMN g ADD GENERATED ALWAYS AS IDENTITY;
-- alter table .. alter column is unsupported
ALTER TABLE generated_identities_test ALTER COLUMN b TYPE int;
SELECT alter_distributed_table('generated_identities_test', 'g');
SELECT alter_distributed_table('generated_identities_test', 'b');
SELECT alter_distributed_table('generated_identities_test', 'c');
SELECT undistribute_table('generated_identities_test');
SELECT * FROM generated_identities_test ORDER BY g;
-- reference table
DROP TABLE generated_identities_test;
CREATE TABLE generated_identities_test (
a int GENERATED BY DEFAULT AS IDENTITY,
b bigint GENERATED ALWAYS AS IDENTITY (START WITH 10 INCREMENT BY 10),
c smallint GENERATED BY DEFAULT AS IDENTITY,
d serial,
e bigserial,
f smallserial,
g int
);
SELECT create_reference_table('generated_identities_test');
\d generated_identities_test
\c - - - :worker_1_port
\d generated_identities.generated_identities_test
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO generated_identities_test (g)
SELECT s FROM generate_series(11,20) s;
SELECT * FROM generated_identities_test ORDER BY g;
SELECT undistribute_table('generated_identities_test');
\d generated_identities_test
\c - - - :worker_1_port
\d generated_identities.generated_identities_test
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
-- alter table .. add column .. GENERATED .. AS IDENTITY
DROP TABLE IF EXISTS color;
CREATE TABLE color (
color_name VARCHAR NOT NULL
);
SELECT create_distributed_table('color', 'color_name');
ALTER TABLE color ADD COLUMN color_id BIGINT GENERATED ALWAYS AS IDENTITY;
INSERT INTO color(color_name) VALUES ('Red');
ALTER TABLE color ADD COLUMN color_id_1 BIGINT GENERATED ALWAYS AS IDENTITY;
DROP TABLE color;
-- insert data from workers
CREATE TABLE color (
color_id BIGINT GENERATED ALWAYS AS IDENTITY UNIQUE,
color_name VARCHAR NOT NULL
);
SELECT create_distributed_table('color', 'color_id');
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SELECT undistribute_table('color');
SELECT create_distributed_table('color', 'color_id');
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO color(color_name) VALUES ('Red');
SELECT count(*) from color;
-- modify sequence & alter table
DROP TABLE color;
CREATE TABLE color (
color_id BIGINT GENERATED ALWAYS AS IDENTITY UNIQUE,
color_name VARCHAR NOT NULL
);
SELECT create_distributed_table('color', 'color_id');
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SELECT undistribute_table('color');
ALTER SEQUENCE color_color_id_seq RENAME TO myseq;
SELECT create_distributed_table('color', 'color_id');
\ds+ myseq
\ds+ color_color_id_seq
\d color
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
\ds+ myseq
\ds+ color_color_id_seq
\d color
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
ALTER SEQUENCE myseq RENAME TO color_color_id_seq;
\ds+ myseq
\ds+ color_color_id_seq
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
\ds+ myseq
\ds+ color_color_id_seq
\d color
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SELECT alter_distributed_table('co23423lor', shard_count := 6);
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
\ds+ color_color_id_seq
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
DROP SCHEMA generated_identities CASCADE;

View File

@ -1,17 +0,0 @@
--
-- MULTI_CREATE_TABLE_NEW_FEATURES
--
-- Verify that the GENERATED ... AS IDENTITY feature in PostgreSQL 10
-- is forbidden in distributed tables.
CREATE TABLE table_identity_col (
id integer GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
payload text );
SELECT create_distributed_table('table_identity_col', 'id', 'append');
SELECT create_distributed_table('table_identity_col', 'id');
SELECT create_distributed_table('table_identity_col', 'payload');
SELECT create_reference_table('table_identity_col');