Identity column implementation refactorings (#6738)

This pull request proposes a change to the logic used for propagating
identity columns to worker nodes in citus. Instead of creating a
dependent sequence for each identity column and changing its default
value to `nextval(seq)/worker_nextval(seq)`, this update will pass the
identity columns as-is to the worker nodes.

Please note that there are a few limitations to this change. 

1. Only bigint identity columns will be allowed in distributed tables to
ensure compatibility with the DDL from any node functionality. Our
current distributed sequence implementation only allows insert
statements from all nodes for bigint sequences.
2. `alter_distributed_table` and `undistribute_table` operations will
not be allowed for tables with identity columns. This is because we do
not have a proper way of keeping sequence states consistent across the
cluster.

DESCRIPTION: Prevents using identity columns on data types other than
`bigint` on distributed tables
DESCRIPTION: Prevents using `alter_distributed_table` and
`undistribute_table` UDFs when a table has identity columns
DESCRIPTION: Fixes a bug that prevents enforcing identity column
restrictions on worker nodes

Depends on #6740
Fixes #6694
pull/6807/head
Gokhan Gulbiz 2023-03-30 10:41:01 +03:00 committed by GitHub
parent d3fb9288ab
commit e71bfd6074
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1233 additions and 940 deletions

View File

@ -942,6 +942,13 @@ CopyTableConversionReturnIntoCurrentContext(TableConversionReturn *tableConversi
static TableConversionReturn * static TableConversionReturn *
ConvertTable(TableConversionState *con) ConvertTable(TableConversionState *con)
{ {
/*
* We do not allow alter_distributed_table and undistribute_table operations
* for tables with identity columns. This is because we do not have a proper way
* of keeping sequence states consistent across the cluster.
*/
ErrorIfTableHasIdentityColumn(con->relationId);
/* /*
* when there are many partitions or colocated tables, memory usage is * when there are many partitions or colocated tables, memory usage is
* accumulated. Free context for each call to ConvertTable. * accumulated. Free context for each call to ConvertTable.
@ -1615,96 +1622,6 @@ CreateMaterializedViewDDLCommand(Oid matViewOid)
} }
/*
* This function marks all the identity sequences as distributed on the given table.
*/
static void
MarkIdentitiesAsDistributed(Oid targetRelationId)
{
Relation relation = relation_open(targetRelationId, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(relation);
relation_close(relation, NoLock);
bool missingSequenceOk = false;
for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex);
if (attributeForm->attidentity)
{
Oid seqOid = getIdentitySequence(targetRelationId, attributeForm->attnum,
missingSequenceOk);
ObjectAddress seqAddress = { 0 };
ObjectAddressSet(seqAddress, RelationRelationId, seqOid);
MarkObjectDistributed(&seqAddress);
}
}
}
/*
* This function returns sql statements to rename identites on the given table
*/
static void
PrepareRenameIdentitiesCommands(Oid sourceRelationId, Oid targetRelationId,
List **outCoordinatorCommands, List **outWorkerCommands)
{
Relation targetRelation = relation_open(targetRelationId, AccessShareLock);
TupleDesc targetTupleDescriptor = RelationGetDescr(targetRelation);
relation_close(targetRelation, NoLock);
bool missingSequenceOk = false;
for (int attributeIndex = 0; attributeIndex < targetTupleDescriptor->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(targetTupleDescriptor,
attributeIndex);
if (attributeForm->attidentity)
{
char *columnName = NameStr(attributeForm->attname);
Oid targetSequenceOid = getIdentitySequence(targetRelationId,
attributeForm->attnum,
missingSequenceOk);
char *targetSequenceName = generate_relation_name(targetSequenceOid, NIL);
Oid sourceSequenceOid = getIdentitySequence(sourceRelationId,
attributeForm->attnum,
missingSequenceOk);
char *sourceSequenceName = generate_relation_name(sourceSequenceOid, NIL);
/* to rename sequence on the coordinator */
*outCoordinatorCommands = lappend(*outCoordinatorCommands, psprintf(
"SET citus.enable_ddl_propagation TO OFF; ALTER SEQUENCE %s RENAME TO %s; RESET citus.enable_ddl_propagation;",
quote_identifier(
targetSequenceName),
quote_identifier(
sourceSequenceName)));
/* update workers to use existing sequence and drop the new one generated by PG */
bool missingTableOk = true;
*outWorkerCommands = lappend(*outWorkerCommands,
GetAlterColumnWithNextvalDefaultCmd(
sourceSequenceOid, sourceRelationId,
columnName,
missingTableOk));
/* drop the sequence generated by identity column */
*outWorkerCommands = lappend(*outWorkerCommands, psprintf(
"DROP SEQUENCE IF EXISTS %s",
quote_identifier(
targetSequenceName)));
}
}
}
/* /*
* ReplaceTable replaces the source table with the target table. * ReplaceTable replaces the source table with the target table.
* It moves all the rows of the source table to target table with INSERT SELECT. * It moves all the rows of the source table to target table with INSERT SELECT.
@ -1763,24 +1680,6 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
ExecuteQueryViaSPI(query->data, SPI_OK_INSERT); ExecuteQueryViaSPI(query->data, SPI_OK_INSERT);
} }
/*
* Drop identity dependencies (sequences marked as DEPENDENCY_INTERNAL) on the workers
* to keep their states after the source table is dropped.
*/
List *ownedIdentitySequences = getOwnedSequences_internal(sourceId, 0,
DEPENDENCY_INTERNAL);
if (ownedIdentitySequences != NIL && ShouldSyncTableMetadata(sourceId))
{
char *qualifiedTableName = quote_qualified_identifier(schemaName, sourceName);
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.worker_drop_sequence_dependency(%s);",
quote_literal_cstr(qualifiedTableName));
SendCommandToWorkersWithMetadata(command->data);
}
/* /*
* Modify regular sequence dependencies (sequences marked as DEPENDENCY_AUTO) * Modify regular sequence dependencies (sequences marked as DEPENDENCY_AUTO)
*/ */
@ -1840,23 +1739,6 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
quote_qualified_identifier(schemaName, sourceName)))); quote_qualified_identifier(schemaName, sourceName))));
} }
/*
* We need to prepare rename identities commands before dropping the original table,
* otherwise we can't find the original names of the identity sequences.
* We prepare separate commands for the coordinator and the workers because:
* In the coordinator, we simply need to rename the identity sequences
* to their names on the old table, because right now the identity
* sequences have default names generated by Postgres with the creation of the new table
* In the workers, we have not dropped the original identity sequences,
* so what we do is we alter the columns and set their default to the
* original identity sequences, and after that we drop the new sequences.
*/
List *coordinatorCommandsToRenameIdentites = NIL;
List *workerCommandsToRenameIdentites = NIL;
PrepareRenameIdentitiesCommands(sourceId, targetId,
&coordinatorCommandsToRenameIdentites,
&workerCommandsToRenameIdentites);
resetStringInfo(query); resetStringInfo(query);
appendStringInfo(query, "DROP %sTABLE %s CASCADE", appendStringInfo(query, "DROP %sTABLE %s CASCADE",
IsForeignTable(sourceId) ? "FOREIGN " : "", IsForeignTable(sourceId) ? "FOREIGN " : "",
@ -1874,27 +1756,6 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
quote_qualified_identifier(schemaName, targetName), quote_qualified_identifier(schemaName, targetName),
quote_identifier(sourceName)); quote_identifier(sourceName));
ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY); ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY);
char *coordinatorCommand = NULL;
foreach_ptr(coordinatorCommand, coordinatorCommandsToRenameIdentites)
{
ExecuteQueryViaSPI(coordinatorCommand, SPI_OK_UTILITY);
}
char *workerCommand = NULL;
foreach_ptr(workerCommand, workerCommandsToRenameIdentites)
{
SendCommandToWorkersWithMetadata(workerCommand);
}
/*
* To preserve identity sequences states in case of redistributing the table again,
* we don't drop them when we undistribute a table. To maintain consistency and
* avoid future problems if we redistribute the table, we want to apply all changes happening to
* the identity sequence in the coordinator to their corresponding sequences in the workers as well.
* That's why we have to mark identity sequences as distributed
*/
MarkIdentitiesAsDistributed(targetId);
} }

View File

@ -1147,7 +1147,7 @@ DropIdentitiesOnTable(Oid relationId)
{ {
Relation relation = relation_open(relationId, AccessShareLock); Relation relation = relation_open(relationId, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(relation); TupleDesc tupleDescriptor = RelationGetDescr(relation);
relation_close(relation, NoLock); List *dropCommandList = NIL;
for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts; for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts;
attributeIndex++) attributeIndex++)
@ -1167,16 +1167,24 @@ DropIdentitiesOnTable(Oid relationId)
qualifiedTableName, qualifiedTableName,
columnName); columnName);
dropCommandList = lappend(dropCommandList, dropCommand->data);
}
}
relation_close(relation, NoLock);
char *dropCommand = NULL;
foreach_ptr(dropCommand, dropCommandList)
{
/* /*
* We need to disable/enable ddl propagation for this command, to prevent * We need to disable/enable ddl propagation for this command, to prevent
* sending unnecessary ALTER COLUMN commands for partitions, to MX workers. * sending unnecessary ALTER COLUMN commands for partitions, to MX workers.
*/ */
ExecuteAndLogUtilityCommandList(list_make3(DISABLE_DDL_PROPAGATION, ExecuteAndLogUtilityCommandList(list_make3(DISABLE_DDL_PROPAGATION,
dropCommand->data, dropCommand,
ENABLE_DDL_PROPAGATION)); ENABLE_DDL_PROPAGATION));
} }
} }
}
/* /*

View File

@ -1369,7 +1369,7 @@ EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid ownerRelationId
foreach_oid(citusTableId, citusTableIdList) foreach_oid(citusTableId, citusTableIdList)
{ {
List *seqInfoList = NIL; List *seqInfoList = NIL;
GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0); GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0, DEPENDENCY_AUTO);
SequenceInfo *seqInfo = NULL; SequenceInfo *seqInfo = NULL;
foreach_ptr(seqInfo, seqInfoList) foreach_ptr(seqInfo, seqInfoList)
@ -1446,7 +1446,7 @@ EnsureRelationHasCompatibleSequenceTypes(Oid relationId)
{ {
List *seqInfoList = NIL; List *seqInfoList = NIL;
GetDependentSequencesWithRelation(relationId, &seqInfoList, 0); GetDependentSequencesWithRelation(relationId, &seqInfoList, 0, DEPENDENCY_AUTO);
EnsureDistributedSequencesHaveOneType(relationId, seqInfoList); EnsureDistributedSequencesHaveOneType(relationId, seqInfoList);
} }
@ -1795,6 +1795,8 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
{ {
Oid parentRelationId = InvalidOid; Oid parentRelationId = InvalidOid;
ErrorIfTableHasUnsupportedIdentityColumn(relationId);
EnsureLocalTableEmptyIfNecessary(relationId, distributionMethod); EnsureLocalTableEmptyIfNecessary(relationId, distributionMethod);
/* user really wants triggers? */ /* user really wants triggers? */

View File

@ -370,7 +370,7 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
bool creatingShellTableOnRemoteNode = true; bool creatingShellTableOnRemoteNode = true;
List *tableDDLCommands = GetFullTableCreationCommands(relationId, List *tableDDLCommands = GetFullTableCreationCommands(relationId,
WORKER_NEXTVAL_SEQUENCE_DEFAULTS, WORKER_NEXTVAL_SEQUENCE_DEFAULTS,
INCLUDE_IDENTITY_AS_SEQUENCE_DEFAULTS, INCLUDE_IDENTITY,
creatingShellTableOnRemoteNode); creatingShellTableOnRemoteNode);
TableDDLCommand *tableDDLCommand = NULL; TableDDLCommand *tableDDLCommand = NULL;
foreach_ptr(tableDDLCommand, tableDDLCommands) foreach_ptr(tableDDLCommand, tableDDLCommands)

View File

@ -33,7 +33,8 @@
/* Local functions forward declarations for helper functions */ /* Local functions forward declarations for helper functions */
static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId); static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId);
static Oid SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress); static Oid SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress, char
depType);
static List * FilterDistributedSequences(GrantStmt *stmt); static List * FilterDistributedSequences(GrantStmt *stmt);
@ -183,7 +184,7 @@ ExtractDefaultColumnsAndOwnedSequences(Oid relationId, List **columnNameList,
char *columnName = NameStr(attributeForm->attname); char *columnName = NameStr(attributeForm->attname);
List *columnOwnedSequences = List *columnOwnedSequences =
getOwnedSequences_internal(relationId, attributeIndex + 1, 0); getOwnedSequences_internal(relationId, attributeIndex + 1, DEPENDENCY_AUTO);
if (attributeForm->atthasdef && list_length(columnOwnedSequences) == 0) if (attributeForm->atthasdef && list_length(columnOwnedSequences) == 0)
{ {
@ -453,21 +454,22 @@ PreprocessAlterSequenceStmt(Node *node, const char *queryString,
/* the code-path only supports a single object */ /* the code-path only supports a single object */
Assert(list_length(addresses) == 1); Assert(list_length(addresses) == 1);
/* We have already asserted that we have exactly 1 address in the addresses. */
ObjectAddress *address = linitial(addresses);
/* error out if the sequence is distributed */ /* error out if the sequence is distributed */
if (IsAnyObjectDistributed(addresses)) if (IsAnyObjectDistributed(addresses) || SequenceUsedInDistributedTable(address,
DEPENDENCY_INTERNAL))
{ {
ereport(ERROR, (errmsg( ereport(ERROR, (errmsg(
"Altering a distributed sequence is currently not supported."))); "Altering a distributed sequence is currently not supported.")));
} }
/* We have already asserted that we have exactly 1 address in the addresses. */
ObjectAddress *address = linitial(addresses);
/* /*
* error out if the sequence is used in a distributed table * error out if the sequence is used in a distributed table
* and this is an ALTER SEQUENCE .. AS .. statement * and this is an ALTER SEQUENCE .. AS .. statement
*/ */
Oid citusTableId = SequenceUsedInDistributedTable(address); Oid citusTableId = SequenceUsedInDistributedTable(address, DEPENDENCY_AUTO);
if (citusTableId != InvalidOid) if (citusTableId != InvalidOid)
{ {
List *options = stmt->options; List *options = stmt->options;
@ -497,16 +499,19 @@ PreprocessAlterSequenceStmt(Node *node, const char *queryString,
* SequenceUsedInDistributedTable returns true if the argument sequence * SequenceUsedInDistributedTable returns true if the argument sequence
* is used as the default value of a column in a distributed table. * is used as the default value of a column in a distributed table.
* Returns false otherwise * Returns false otherwise
* See DependencyType for the possible values of depType.
* We use DEPENDENCY_INTERNAL for sequences created by identity column.
* DEPENDENCY_AUTO for regular sequences.
*/ */
static Oid static Oid
SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress) SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress, char depType)
{ {
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
Oid citusTableId = InvalidOid; Oid citusTableId = InvalidOid;
foreach_oid(citusTableId, citusTableIdList) foreach_oid(citusTableId, citusTableIdList)
{ {
List *seqInfoList = NIL; List *seqInfoList = NIL;
GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0); GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0, depType);
SequenceInfo *seqInfo = NULL; SequenceInfo *seqInfo = NULL;
foreach_ptr(seqInfo, seqInfoList) foreach_ptr(seqInfo, seqInfoList)
{ {

View File

@ -1378,29 +1378,6 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
} }
} }
/*
* We check for ADD COLUMN .. GENERATED .. AS IDENTITY expr
* since it uses a sequence as an internal dependency
* we should deparse the statement
*/
constraint = NULL;
foreach_ptr(constraint, columnConstraints)
{
if (constraint->contype == CONSTR_IDENTITY)
{
deparseAT = true;
useInitialDDLCommandString = false;
/*
* Since we don't support constraints for AT_AddColumn
* we have to set is_not_null to true explicitly for identity columns
*/
ColumnDef *newColDef = copyObject(columnDefinition);
newColDef->constraints = NULL;
newColDef->is_not_null = true;
newCmd->def = (Node *) newColDef;
}
}
/* /*
* We check for ADD COLUMN .. SERIAL pseudo-type * We check for ADD COLUMN .. SERIAL pseudo-type
@ -2542,34 +2519,6 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
} }
} }
} }
/*
* We check for ADD COLUMN .. GENERATED AS IDENTITY expr
* since it uses a seqeunce as an internal dependency
*/
constraint = NULL;
foreach_ptr(constraint, columnConstraints)
{
if (constraint->contype == CONSTR_IDENTITY)
{
AttrNumber attnum = get_attnum(relationId,
columnDefinition->colname);
bool missing_ok = false;
Oid seqOid = getIdentitySequence(relationId, attnum, missing_ok);
if (ShouldSyncTableMetadata(relationId))
{
needMetadataSyncForNewSequences = true;
alterTableDefaultNextvalCmd =
GetAddColumnWithNextvalDefaultCmd(seqOid,
relationId,
columnDefinition
->colname,
columnDefinition
->typeName);
}
}
}
} }
/* /*
* We check for ALTER COLUMN .. SET DEFAULT nextval('user_defined_seq') * We check for ALTER COLUMN .. SET DEFAULT nextval('user_defined_seq')
@ -3225,6 +3174,17 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
{ {
if (columnConstraint->contype == CONSTR_IDENTITY) if (columnConstraint->contype == CONSTR_IDENTITY)
{ {
/*
* We currently don't support adding an identity column for an MX table
*/
if (ShouldSyncTableMetadata(relationId))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"cannot execute ADD COLUMN commands involving identity"
" columns when metadata is synchronized to workers")));
}
/* /*
* Currently we don't support backfilling the new identity column with default values * Currently we don't support backfilling the new identity column with default values
* if the table is not empty * if the table is not empty
@ -3355,7 +3315,8 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
*/ */
AttrNumber attnum = get_attnum(relationId, command->name); AttrNumber attnum = get_attnum(relationId, command->name);
List *seqInfoList = NIL; List *seqInfoList = NIL;
GetDependentSequencesWithRelation(relationId, &seqInfoList, attnum); GetDependentSequencesWithRelation(relationId, &seqInfoList, attnum,
DEPENDENCY_AUTO);
if (seqInfoList != NIL) if (seqInfoList != NIL)
{ {
ereport(ERROR, (errmsg("cannot execute ALTER COLUMN TYPE .. command " ereport(ERROR, (errmsg("cannot execute ALTER COLUMN TYPE .. command "
@ -4014,3 +3975,59 @@ MakeNameListFromRangeVar(const RangeVar *rel)
return list_make1(makeString(rel->relname)); return list_make1(makeString(rel->relname));
} }
} }
/*
* ErrorIfTableHasUnsupportedIdentityColumn errors out if the given table has any identity column other than bigint identity column.
*/
void
ErrorIfTableHasUnsupportedIdentityColumn(Oid relationId)
{
Relation relation = relation_open(relationId, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(relation);
for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex);
if (attributeForm->attidentity && attributeForm->atttypid != INT8OID)
{
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"cannot complete operation on %s with smallint/int identity column",
qualifiedRelationName),
errhint(
"Use bigint identity column instead.")));
}
}
relation_close(relation, NoLock);
}
/*
* ErrorIfTableHasIdentityColumn errors out if the given table has identity column
*/
void
ErrorIfTableHasIdentityColumn(Oid relationId)
{
Relation relation = relation_open(relationId, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(relation);
for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex);
if (attributeForm->attidentity)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"cannot complete operation on a table with identity column")));
}
}
relation_close(relation, NoLock);
}

View File

@ -304,10 +304,7 @@ pg_get_sequencedef(Oid sequenceRelationId)
* When it's WORKER_NEXTVAL_SEQUENCE_DEFAULTS, the function creates the DEFAULT * When it's WORKER_NEXTVAL_SEQUENCE_DEFAULTS, the function creates the DEFAULT
* clause using worker_nextval('sequence') and not nextval('sequence') * clause using worker_nextval('sequence') and not nextval('sequence')
* When IncludeIdentities is NO_IDENTITY, the function does not include identity column * When IncludeIdentities is NO_IDENTITY, the function does not include identity column
* specifications. When it's INCLUDE_IDENTITY_AS_SEQUENCE_DEFAULTS, the function * specifications. When it's INCLUDE_IDENTITY it creates GENERATED .. AS IDENTIY clauses.
* uses sequences and set them as default values for identity columns by using exactly
* the same approach with worker_nextval('sequence') & nextval('sequence') logic
* desribed above. When it's INCLUDE_IDENTITY it creates GENERATED .. AS IDENTIY clauses.
*/ */
char * char *
pg_get_tableschemadef_string(Oid tableRelationId, IncludeSequenceDefaults pg_get_tableschemadef_string(Oid tableRelationId, IncludeSequenceDefaults
@ -403,26 +400,9 @@ pg_get_tableschemadef_string(Oid tableRelationId, IncludeSequenceDefaults
Oid seqOid = getIdentitySequence(RelationGetRelid(relation), Oid seqOid = getIdentitySequence(RelationGetRelid(relation),
attributeForm->attnum, missing_ok); attributeForm->attnum, missing_ok);
char *sequenceName = generate_qualified_relation_name(seqOid); if (includeIdentityDefaults == INCLUDE_IDENTITY)
if (includeIdentityDefaults == INCLUDE_IDENTITY_AS_SEQUENCE_DEFAULTS)
{
if (pg_get_sequencedef(seqOid)->seqtypid != INT8OID)
{
appendStringInfo(&buffer,
" DEFAULT worker_nextval(%s::regclass)",
quote_literal_cstr(sequenceName));
}
else
{
appendStringInfo(&buffer, " DEFAULT nextval(%s::regclass)",
quote_literal_cstr(sequenceName));
}
}
else if (includeIdentityDefaults == INCLUDE_IDENTITY)
{ {
Form_pg_sequence pgSequenceForm = pg_get_sequencedef(seqOid); Form_pg_sequence pgSequenceForm = pg_get_sequencedef(seqOid);
uint64 sequenceStart = nextval_internal(seqOid, false);
char *sequenceDef = psprintf( char *sequenceDef = psprintf(
" GENERATED %s AS IDENTITY (INCREMENT BY " INT64_FORMAT \ " GENERATED %s AS IDENTITY (INCREMENT BY " INT64_FORMAT \
" MINVALUE " INT64_FORMAT " MAXVALUE " " MINVALUE " INT64_FORMAT " MAXVALUE "
@ -433,7 +413,8 @@ pg_get_tableschemadef_string(Oid tableRelationId, IncludeSequenceDefaults
"ALWAYS" : "BY DEFAULT", "ALWAYS" : "BY DEFAULT",
pgSequenceForm->seqincrement, pgSequenceForm->seqincrement,
pgSequenceForm->seqmin, pgSequenceForm->seqmin,
pgSequenceForm->seqmax, sequenceStart, pgSequenceForm->seqmax,
pgSequenceForm->seqstart,
pgSequenceForm->seqcache, pgSequenceForm->seqcache,
pgSequenceForm->seqcycle ? "" : "NO "); pgSequenceForm->seqcycle ? "" : "NO ");

View File

@ -1870,7 +1870,7 @@ static List *
GetRelationSequenceDependencyList(Oid relationId) GetRelationSequenceDependencyList(Oid relationId)
{ {
List *seqInfoList = NIL; List *seqInfoList = NIL;
GetDependentSequencesWithRelation(relationId, &seqInfoList, 0); GetDependentSequencesWithRelation(relationId, &seqInfoList, 0, DEPENDENCY_AUTO);
List *seqIdList = NIL; List *seqIdList = NIL;
SequenceInfo *seqInfo = NULL; SequenceInfo *seqInfo = NULL;

View File

@ -1632,10 +1632,13 @@ GetAttributeTypeOid(Oid relationId, AttrNumber attnum)
* For both cases, we use the intermediate AttrDefault object from pg_depend. * For both cases, we use the intermediate AttrDefault object from pg_depend.
* If attnum is specified, we only return the sequences related to that * If attnum is specified, we only return the sequences related to that
* attribute of the relationId. * attribute of the relationId.
* See DependencyType for the possible values of depType.
* We use DEPENDENCY_INTERNAL for sequences created by identity column.
* DEPENDENCY_AUTO for regular sequences.
*/ */
void void
GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList, GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList,
AttrNumber attnum) AttrNumber attnum, char depType)
{ {
Assert(*seqInfoList == NIL); Assert(*seqInfoList == NIL);
@ -1672,7 +1675,7 @@ GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList,
if (deprec->classid == AttrDefaultRelationId && if (deprec->classid == AttrDefaultRelationId &&
deprec->objsubid == 0 && deprec->objsubid == 0 &&
deprec->refobjsubid != 0 && deprec->refobjsubid != 0 &&
deprec->deptype == DEPENDENCY_AUTO) deprec->deptype == depType)
{ {
/* /*
* We are going to generate corresponding SequenceInfo * We are going to generate corresponding SequenceInfo
@ -1681,8 +1684,7 @@ GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList,
attrdefResult = lappend_oid(attrdefResult, deprec->objid); attrdefResult = lappend_oid(attrdefResult, deprec->objid);
attrdefAttnumResult = lappend_int(attrdefAttnumResult, deprec->refobjsubid); attrdefAttnumResult = lappend_int(attrdefAttnumResult, deprec->refobjsubid);
} }
else if ((deprec->deptype == DEPENDENCY_AUTO || deprec->deptype == else if (deprec->deptype == depType &&
DEPENDENCY_INTERNAL) &&
deprec->refobjsubid != 0 && deprec->refobjsubid != 0 &&
deprec->classid == RelationRelationId && deprec->classid == RelationRelationId &&
get_rel_relkind(deprec->objid) == RELKIND_SEQUENCE) get_rel_relkind(deprec->objid) == RELKIND_SEQUENCE)
@ -1929,6 +1931,53 @@ SequenceDependencyCommandList(Oid relationId)
} }
/*
* IdentitySequenceDependencyCommandList generate a command to execute
* a UDF (WORKER_ADJUST_IDENTITY_COLUMN_SEQ_RANGES) on workers to modify the identity
* columns min/max values to produce unique values on workers.
*/
List *
IdentitySequenceDependencyCommandList(Oid targetRelationId)
{
List *commandList = NIL;
Relation relation = relation_open(targetRelationId, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(relation);
bool tableHasIdentityColumn = false;
for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex);
if (attributeForm->attidentity)
{
tableHasIdentityColumn = true;
break;
}
}
relation_close(relation, NoLock);
if (tableHasIdentityColumn)
{
StringInfo stringInfo = makeStringInfo();
char *tableName = generate_qualified_relation_name(targetRelationId);
appendStringInfo(stringInfo,
WORKER_ADJUST_IDENTITY_COLUMN_SEQ_RANGES,
quote_literal_cstr(tableName));
commandList = lappend(commandList,
makeTableDDLCommandString(
stringInfo->data));
}
return commandList;
}
/* /*
* CreateSequenceDependencyCommand generates a query string for calling * CreateSequenceDependencyCommand generates a query string for calling
* worker_record_sequence_dependency on the worker to recreate a sequence->table * worker_record_sequence_dependency on the worker to recreate a sequence->table
@ -2651,8 +2700,7 @@ CreateShellTableOnWorkers(Oid relationId)
List *commandList = list_make1(DISABLE_DDL_PROPAGATION); List *commandList = list_make1(DISABLE_DDL_PROPAGATION);
IncludeSequenceDefaults includeSequenceDefaults = WORKER_NEXTVAL_SEQUENCE_DEFAULTS; IncludeSequenceDefaults includeSequenceDefaults = WORKER_NEXTVAL_SEQUENCE_DEFAULTS;
IncludeIdentities includeIdentityDefaults = IncludeIdentities includeIdentityDefaults = INCLUDE_IDENTITY;
INCLUDE_IDENTITY_AS_SEQUENCE_DEFAULTS;
bool creatingShellTableOnRemoteNode = true; bool creatingShellTableOnRemoteNode = true;
List *tableDDLCommands = GetFullTableCreationCommands(relationId, List *tableDDLCommands = GetFullTableCreationCommands(relationId,

View File

@ -461,10 +461,7 @@ ResolveRelationId(text *relationName, bool missingOk)
* definition, optional column storage and statistics definitions, and index * definition, optional column storage and statistics definitions, and index
* constraint and trigger definitions. * constraint and trigger definitions.
* When IncludeIdentities is NO_IDENTITY, the function does not include identity column * When IncludeIdentities is NO_IDENTITY, the function does not include identity column
* specifications. When it's INCLUDE_IDENTITY_AS_SEQUENCE_DEFAULTS, the function * specifications. When it's INCLUDE_IDENTITY it creates GENERATED .. AS IDENTIY clauses.
* uses sequences and set them as default values for identity columns by using exactly
* the same approach with worker_nextval('sequence') & nextval('sequence') logic
* desribed above. When it's INCLUDE_IDENTITY it creates GENERATED .. AS IDENTIY clauses.
*/ */
List * List *
GetFullTableCreationCommands(Oid relationId, GetFullTableCreationCommands(Oid relationId,
@ -500,6 +497,15 @@ GetFullTableCreationCommands(Oid relationId,
tableDDLEventList = lappend(tableDDLEventList, tableDDLEventList = lappend(tableDDLEventList,
truncateTriggerCommand); truncateTriggerCommand);
} }
/*
* For identity column sequences, we only need to modify
* their min/max values to produce unique values on the worker nodes.
*/
List *identitySequenceDependencyCommandList =
IdentitySequenceDependencyCommandList(relationId);
tableDDLEventList = list_concat(tableDDLEventList,
identitySequenceDependencyCommandList);
} }
tableDDLEventList = list_concat(tableDDLEventList, postLoadCreationCommandList); tableDDLEventList = list_concat(tableDDLEventList, postLoadCreationCommandList);

View File

@ -1,6 +1,6 @@
-- citus--11.2-1--11.3-1 -- citus--11.2-1--11.3-1
#include "udfs/repl_origin_helper/11.3-1.sql" #include "udfs/repl_origin_helper/11.3-1.sql"
#include "udfs/worker_adjust_identity_column_seq_ranges/11.3-1.sql"
ALTER TABLE pg_catalog.pg_dist_authinfo REPLICA IDENTITY USING INDEX pg_dist_authinfo_identification_index; ALTER TABLE pg_catalog.pg_dist_authinfo REPLICA IDENTITY USING INDEX pg_dist_authinfo_identification_index;
ALTER TABLE pg_catalog.pg_dist_partition REPLICA IDENTITY USING INDEX pg_dist_partition_logical_relid_index; ALTER TABLE pg_catalog.pg_dist_partition REPLICA IDENTITY USING INDEX pg_dist_partition_logical_relid_index;
ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY USING INDEX pg_dist_placement_placementid_index; ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY USING INDEX pg_dist_placement_placementid_index;

View File

@ -3,6 +3,13 @@
DROP FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking(); DROP FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking();
DROP FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking(); DROP FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking();
DROP FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active(); DROP FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active();
DROP FUNCTION IF EXISTS pg_catalog.worker_adjust_identity_column_seq_ranges(regclass);
ALTER TABLE pg_catalog.pg_dist_authinfo REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_partition REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_shard REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_authinfo REPLICA IDENTITY NOTHING; ALTER TABLE pg_catalog.pg_dist_authinfo REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_partition REPLICA IDENTITY NOTHING; ALTER TABLE pg_catalog.pg_dist_partition REPLICA IDENTITY NOTHING;

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.worker_adjust_identity_column_seq_ranges(regclass)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_adjust_identity_column_seq_ranges$$;
COMMENT ON FUNCTION pg_catalog.worker_adjust_identity_column_seq_ranges(regclass)
IS 'modify identity column seq ranges to produce globally unique values';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.worker_adjust_identity_column_seq_ranges(regclass)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_adjust_identity_column_seq_ranges$$;
COMMENT ON FUNCTION pg_catalog.worker_adjust_identity_column_seq_ranges(regclass)
IS 'modify identity column seq ranges to produce globally unique values';

View File

@ -70,6 +70,7 @@ static void AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequence
PG_FUNCTION_INFO_V1(worker_apply_shard_ddl_command); PG_FUNCTION_INFO_V1(worker_apply_shard_ddl_command);
PG_FUNCTION_INFO_V1(worker_apply_inter_shard_ddl_command); PG_FUNCTION_INFO_V1(worker_apply_inter_shard_ddl_command);
PG_FUNCTION_INFO_V1(worker_apply_sequence_command); PG_FUNCTION_INFO_V1(worker_apply_sequence_command);
PG_FUNCTION_INFO_V1(worker_adjust_identity_column_seq_ranges);
PG_FUNCTION_INFO_V1(worker_append_table_to_shard); PG_FUNCTION_INFO_V1(worker_append_table_to_shard);
PG_FUNCTION_INFO_V1(worker_nextval); PG_FUNCTION_INFO_V1(worker_nextval);
@ -133,6 +134,60 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS)
} }
/*
* worker_adjust_identity_column_seq_ranges takes a table oid, runs an ALTER SEQUENCE statement
* for each identity column to adjust the minvalue and maxvalue of the sequence owned by
* identity column such that the sequence creates globally unique values.
* We use table oid instead of sequence name to avoid any potential conflicts between sequences of different tables. This way, we can safely iterate through identity columns on a specific table without any issues. While this may introduce a small amount of business logic to workers, it's a much safer approach overall.
*/
Datum
worker_adjust_identity_column_seq_ranges(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid tableRelationId = PG_GETARG_OID(0);
EnsureTableOwner(tableRelationId);
Relation tableRelation = relation_open(tableRelationId, AccessShareLock);
TupleDesc tableTupleDesc = RelationGetDescr(tableRelation);
bool missingSequenceOk = false;
for (int attributeIndex = 0; attributeIndex < tableTupleDesc->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(tableTupleDesc,
attributeIndex);
/* skip dropped columns */
if (attributeForm->attisdropped)
{
continue;
}
if (attributeForm->attidentity)
{
Oid sequenceOid = getIdentitySequence(tableRelationId,
attributeForm->attnum,
missingSequenceOk);
Oid sequenceSchemaOid = get_rel_namespace(sequenceOid);
char *sequenceSchemaName = get_namespace_name(sequenceSchemaOid);
char *sequenceName = get_rel_name(sequenceOid);
Oid sequenceTypeId = pg_get_sequencedef(sequenceOid)->seqtypid;
AlterSequenceMinMax(sequenceOid, sequenceSchemaName, sequenceName,
sequenceTypeId);
}
}
relation_close(tableRelation, NoLock);
PG_RETURN_VOID();
}
/* /*
* worker_apply_sequence_command takes a CREATE SEQUENCE command string, runs the * worker_apply_sequence_command takes a CREATE SEQUENCE command string, runs the
* CREATE SEQUENCE command then creates and runs an ALTER SEQUENCE statement * CREATE SEQUENCE command then creates and runs an ALTER SEQUENCE statement

View File

@ -584,6 +584,9 @@ extern bool ConstrTypeCitusCanDefaultName(ConstrType constrType);
extern char * GetAlterColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationId, extern char * GetAlterColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationId,
char *colname, bool missingTableOk); char *colname, bool missingTableOk);
extern void ErrorIfTableHasUnsupportedIdentityColumn(Oid relationId);
extern void ErrorIfTableHasIdentityColumn(Oid relationId);
/* text_search.c - forward declarations */ /* text_search.c - forward declarations */
extern List * GetCreateTextSearchConfigStatements(const ObjectAddress *address); extern List * GetCreateTextSearchConfigStatements(const ObjectAddress *address);
extern List * GetCreateTextSearchDictionaryStatements(const ObjectAddress *address); extern List * GetCreateTextSearchDictionaryStatements(const ObjectAddress *address);

View File

@ -124,8 +124,7 @@ typedef enum IncludeSequenceDefaults
typedef enum IncludeIdentities typedef enum IncludeIdentities
{ {
NO_IDENTITY = 0, /* don't include identities */ NO_IDENTITY = 0, /* don't include identities */
INCLUDE_IDENTITY_AS_SEQUENCE_DEFAULTS = 1, /* include identities as sequences */ INCLUDE_IDENTITY = 1 /* include identities as-is*/
INCLUDE_IDENTITY = 2 /* include identities as-is*/
} IncludeIdentities; } IncludeIdentities;

View File

@ -101,11 +101,12 @@ extern void SyncNodeMetadataToNodesMain(Datum main_arg);
extern void SignalMetadataSyncDaemon(Oid database, int sig); extern void SignalMetadataSyncDaemon(Oid database, int sig);
extern bool ShouldInitiateMetadataSync(bool *lockFailure); extern bool ShouldInitiateMetadataSync(bool *lockFailure);
extern List * SequenceDependencyCommandList(Oid relationId); extern List * SequenceDependencyCommandList(Oid relationId);
extern List * IdentitySequenceDependencyCommandList(Oid targetRelationId);
extern List * DDLCommandsForSequence(Oid sequenceOid, char *ownerName); extern List * DDLCommandsForSequence(Oid sequenceOid, char *ownerName);
extern List * GetSequencesFromAttrDef(Oid attrdefOid); extern List * GetSequencesFromAttrDef(Oid attrdefOid);
extern void GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList, extern void GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList,
AttrNumber attnum); AttrNumber attnum, char depType);
extern List * GetDependentFunctionsWithRelation(Oid relationId); extern List * GetDependentFunctionsWithRelation(Oid relationId);
extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum); extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum);
extern void SetLocalEnableMetadataSync(bool state); extern void SetLocalEnableMetadataSync(bool state);
@ -146,6 +147,8 @@ extern void SyncDeleteColocationGroupToNodes(uint32 colocationId);
"placementid = EXCLUDED.placementid" "placementid = EXCLUDED.placementid"
#define METADATA_SYNC_CHANNEL "metadata_sync" #define METADATA_SYNC_CHANNEL "metadata_sync"
#define WORKER_ADJUST_IDENTITY_COLUMN_SEQ_RANGES \
"SELECT pg_catalog.worker_adjust_identity_column_seq_ranges(%s)"
/* controlled via GUC */ /* controlled via GUC */
extern char *EnableManualMetadataChangesForUser; extern char *EnableManualMetadataChangesForUser;

View File

@ -1,527 +1,415 @@
-- This test file has an alternative output because of error messages vary for PG13
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int <= 13 AS server_version_le_13;
server_version_le_13
---------------------------------------------------------------------
f
(1 row)
CREATE SCHEMA generated_identities; CREATE SCHEMA generated_identities;
SET search_path TO generated_identities; SET search_path TO generated_identities;
SET client_min_messages to ERROR; SET client_min_messages to ERROR;
SET citus.shard_replication_factor TO 1;
SELECT 1 from citus_add_node('localhost', :master_port, groupId=>0); SELECT 1 from citus_add_node('localhost', :master_port, groupId=>0);
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
(1 row) (1 row)
DROP TABLE IF EXISTS generated_identities_test; -- smallint identity column can not be distributed
-- create a partitioned table for testing. CREATE TABLE smallint_identity_column (
CREATE TABLE generated_identities_test ( a smallint GENERATED BY DEFAULT AS IDENTITY
a int CONSTRAINT myconname GENERATED BY DEFAULT AS IDENTITY, );
b bigint GENERATED ALWAYS AS IDENTITY (START WITH 10 INCREMENT BY 10), SELECT create_distributed_table('smallint_identity_column', 'a');
c smallint GENERATED BY DEFAULT AS IDENTITY, ERROR: cannot complete operation on generated_identities.smallint_identity_column with smallint/int identity column
d serial, HINT: Use bigint identity column instead.
e bigserial, SELECT create_distributed_table_concurrently('smallint_identity_column', 'a');
f smallserial, ERROR: cannot complete operation on generated_identities.smallint_identity_column with smallint/int identity column
g int HINT: Use bigint identity column instead.
) SELECT create_reference_table('smallint_identity_column');
PARTITION BY RANGE (a); ERROR: cannot complete operation on a table with identity column
CREATE TABLE generated_identities_test_1_5 PARTITION OF generated_identities_test FOR VALUES FROM (1) TO (5); SELECT citus_add_local_table_to_metadata('smallint_identity_column');
CREATE TABLE generated_identities_test_5_50 PARTITION OF generated_identities_test FOR VALUES FROM (5) TO (50);
-- local tables
SELECT citus_add_local_table_to_metadata('generated_identities_test');
citus_add_local_table_to_metadata citus_add_local_table_to_metadata
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
\d generated_identities_test DROP TABLE smallint_identity_column;
Partitioned table "generated_identities.generated_identities_test" -- int identity column can not be distributed
Column | Type | Collation | Nullable | Default CREATE TABLE int_identity_column (
--------------------------------------------------------------------- a int GENERATED BY DEFAULT AS IDENTITY
a | integer | | not null | generated by default as identity );
b | bigint | | not null | generated always as identity SELECT create_distributed_table('int_identity_column', 'a');
c | smallint | | not null | generated by default as identity ERROR: cannot complete operation on generated_identities.int_identity_column with smallint/int identity column
d | integer | | not null | nextval('generated_identities_test_d_seq'::regclass) HINT: Use bigint identity column instead.
e | bigint | | not null | nextval('generated_identities_test_e_seq'::regclass) SELECT create_distributed_table_concurrently('int_identity_column', 'a');
f | smallint | | not null | nextval('generated_identities_test_f_seq'::regclass) ERROR: cannot complete operation on generated_identities.int_identity_column with smallint/int identity column
g | integer | | | HINT: Use bigint identity column instead.
Partition key: RANGE (a) SELECT create_reference_table('int_identity_column');
Number of partitions: 2 (Use \d+ to list them.) ERROR: cannot complete operation on a table with identity column
SELECT citus_add_local_table_to_metadata('int_identity_column');
\c - - - :worker_1_port citus_add_local_table_to_metadata
\d generated_identities.generated_identities_test
Partitioned table "generated_identities.generated_identities_test"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | integer | | not null | worker_nextval('generated_identities.generated_identities_test_a_seq'::regclass)
b | bigint | | not null | nextval('generated_identities.generated_identities_test_b_seq'::regclass)
c | smallint | | not null | worker_nextval('generated_identities.generated_identities_test_c_seq'::regclass)
d | integer | | not null | worker_nextval('generated_identities.generated_identities_test_d_seq'::regclass)
e | bigint | | not null | nextval('generated_identities.generated_identities_test_e_seq'::regclass)
f | smallint | | not null | worker_nextval('generated_identities.generated_identities_test_f_seq'::regclass)
g | integer | | |
Partition key: RANGE (a)
Number of partitions: 2 (Use \d+ to list them.)
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SELECT undistribute_table('generated_identities_test');
undistribute_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
SELECT citus_remove_node('localhost', :master_port); DROP TABLE int_identity_column;
citus_remove_node RESET citus.shard_replication_factor;
CREATE TABLE bigint_identity_column (
a bigint GENERATED BY DEFAULT AS IDENTITY,
b int
);
SELECT citus_add_local_table_to_metadata('bigint_identity_column');
citus_add_local_table_to_metadata
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
SELECT create_distributed_table('generated_identities_test', 'a'); DROP TABLE bigint_identity_column;
CREATE TABLE bigint_identity_column (
a bigint GENERATED BY DEFAULT AS IDENTITY,
b int
);
SELECT create_distributed_table('bigint_identity_column', 'a');
create_distributed_table create_distributed_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
\d generated_identities_test \d bigint_identity_column
Partitioned table "generated_identities.generated_identities_test" Table "generated_identities.bigint_identity_column"
Column | Type | Collation | Nullable | Default Column | Type | Collation | Nullable | Default
--------------------------------------------------------------------- ---------------------------------------------------------------------
a | integer | | not null | generated by default as identity a | bigint | | not null | generated by default as identity
b | bigint | | not null | generated always as identity b | integer | | |
c | smallint | | not null | generated by default as identity
d | integer | | not null | nextval('generated_identities_test_d_seq'::regclass)
e | bigint | | not null | nextval('generated_identities_test_e_seq'::regclass)
f | smallint | | not null | nextval('generated_identities_test_f_seq'::regclass)
g | integer | | |
Partition key: RANGE (a)
Number of partitions: 2 (Use \d+ to list them.)
\c - - - :worker_1_port \c - - - :worker_1_port
\d generated_identities.generated_identities_test SET search_path TO generated_identities;
Partitioned table "generated_identities.generated_identities_test" SET client_min_messages to ERROR;
INSERT INTO bigint_identity_column (b)
SELECT s FROM generate_series(1,10) s;
\d generated_identities.bigint_identity_column
Table "generated_identities.bigint_identity_column"
Column | Type | Collation | Nullable | Default Column | Type | Collation | Nullable | Default
--------------------------------------------------------------------- ---------------------------------------------------------------------
a | integer | | not null | worker_nextval('generated_identities.generated_identities_test_a_seq'::regclass) a | bigint | | not null | generated by default as identity
b | bigint | | not null | nextval('generated_identities.generated_identities_test_b_seq'::regclass) b | integer | | |
c | smallint | | not null | worker_nextval('generated_identities.generated_identities_test_c_seq'::regclass)
d | integer | | not null | worker_nextval('generated_identities.generated_identities_test_d_seq'::regclass)
e | bigint | | not null | nextval('generated_identities.generated_identities_test_e_seq'::regclass)
f | smallint | | not null | worker_nextval('generated_identities.generated_identities_test_f_seq'::regclass)
g | integer | | |
Partition key: RANGE (a)
Number of partitions: 2 (Use \d+ to list them.)
\c - - - :master_port \c - - - :master_port
SET search_path TO generated_identities; SET search_path TO generated_identities;
SET client_min_messages to ERROR; SET client_min_messages to ERROR;
insert into generated_identities_test (g) values (1); INSERT INTO bigint_identity_column (b)
insert into generated_identities_test (g) SELECT 2; SELECT s FROM generate_series(11,20) s;
INSERT INTO generated_identities_test (g) SELECT * FROM bigint_identity_column ORDER BY B ASC;
a | b
---------------------------------------------------------------------
3940649673949185 | 1
3940649673949186 | 2
3940649673949187 | 3
3940649673949188 | 4
3940649673949189 | 5
3940649673949190 | 6
3940649673949191 | 7
3940649673949192 | 8
3940649673949193 | 9
3940649673949194 | 10
1 | 11
2 | 12
3 | 13
4 | 14
5 | 15
6 | 16
7 | 17
8 | 18
9 | 19
10 | 20
(20 rows)
-- table with identity column cannot be altered.
SELECT alter_distributed_table('bigint_identity_column', 'b');
ERROR: cannot complete operation on a table with identity column
-- table with identity column cannot be undistributed.
SELECT undistribute_table('bigint_identity_column');
ERROR: cannot complete operation on a table with identity column
DROP TABLE bigint_identity_column;
-- create a partitioned table for testing.
CREATE TABLE partitioned_table (
a bigint CONSTRAINT myconname GENERATED BY DEFAULT AS IDENTITY (START WITH 10 INCREMENT BY 10),
b bigint GENERATED ALWAYS AS IDENTITY (START WITH 10 INCREMENT BY 10),
c int
)
PARTITION BY RANGE (c);
CREATE TABLE partitioned_table_1_50 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (50);
CREATE TABLE partitioned_table_50_500 PARTITION OF partitioned_table FOR VALUES FROM (50) TO (1000);
SELECT create_distributed_table('partitioned_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\d partitioned_table
Partitioned table "generated_identities.partitioned_table"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | bigint | | not null | generated by default as identity
b | bigint | | not null | generated always as identity
c | integer | | |
Partition key: RANGE (c)
Number of partitions: 2 (Use \d+ to list them.)
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
\d generated_identities.partitioned_table
Partitioned table "generated_identities.partitioned_table"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | bigint | | not null | generated by default as identity
b | bigint | | not null | generated always as identity
c | integer | | |
Partition key: RANGE (c)
Number of partitions: 2 (Use \d+ to list them.)
insert into partitioned_table (c) values (1);
insert into partitioned_table (c) SELECT 2;
INSERT INTO partitioned_table (c)
SELECT s FROM generate_series(3,7) s; SELECT s FROM generate_series(3,7) s;
SELECT * FROM generated_identities_test ORDER BY 1;
a | b | c | d | e | f | g
---------------------------------------------------------------------
1 | 10 | 1 | 1 | 1 | 1 | 1
2 | 20 | 2 | 2 | 2 | 2 | 2
3 | 30 | 3 | 3 | 3 | 3 | 3
4 | 40 | 4 | 4 | 4 | 4 | 4
5 | 50 | 5 | 5 | 5 | 5 | 5
6 | 60 | 6 | 6 | 6 | 6 | 6
7 | 70 | 7 | 7 | 7 | 7 | 7
(7 rows)
SELECT undistribute_table('generated_identities_test');
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT * FROM generated_identities_test ORDER BY 1;
a | b | c | d | e | f | g
---------------------------------------------------------------------
1 | 10 | 1 | 1 | 1 | 1 | 1
2 | 20 | 2 | 2 | 2 | 2 | 2
3 | 30 | 3 | 3 | 3 | 3 | 3
4 | 40 | 4 | 4 | 4 | 4 | 4
5 | 50 | 5 | 5 | 5 | 5 | 5
6 | 60 | 6 | 6 | 6 | 6 | 6
7 | 70 | 7 | 7 | 7 | 7 | 7
(7 rows)
\d generated_identities_test
Partitioned table "generated_identities.generated_identities_test"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | integer | | not null | generated by default as identity
b | bigint | | not null | generated always as identity
c | smallint | | not null | generated by default as identity
d | integer | | not null | nextval('generated_identities_test_d_seq'::regclass)
e | bigint | | not null | nextval('generated_identities_test_e_seq'::regclass)
f | smallint | | not null | nextval('generated_identities_test_f_seq'::regclass)
g | integer | | |
Partition key: RANGE (a)
Number of partitions: 2 (Use \d+ to list them.)
\c - - - :worker_1_port
\d generated_identities.generated_identities_test
\c - - - :master_port \c - - - :master_port
SET search_path TO generated_identities; SET search_path TO generated_identities;
SET client_min_messages to ERROR; SET client_min_messages to ERROR;
INSERT INTO generated_identities_test (g) INSERT INTO partitioned_table (c)
SELECT s FROM generate_series(8,10) s; SELECT s FROM generate_series(10,20) s;
SELECT * FROM generated_identities_test ORDER BY 1; INSERT INTO partitioned_table (a,c) VALUES (998,998);
a | b | c | d | e | f | g INSERT INTO partitioned_table (a,b,c) OVERRIDING SYSTEM VALUE VALUES (999,999,999);
SELECT * FROM partitioned_table ORDER BY c ASC;
a | b | c
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 | 10 | 1 | 1 | 1 | 1 | 1 3940649673949185 | 3940649673949185 | 1
2 | 20 | 2 | 2 | 2 | 2 | 2 3940649673949195 | 3940649673949195 | 2
3 | 30 | 3 | 3 | 3 | 3 | 3 3940649673949205 | 3940649673949205 | 3
4 | 40 | 4 | 4 | 4 | 4 | 4 3940649673949215 | 3940649673949215 | 4
5 | 50 | 5 | 5 | 5 | 5 | 5 3940649673949225 | 3940649673949225 | 5
6 | 60 | 6 | 6 | 6 | 6 | 6 3940649673949235 | 3940649673949235 | 6
7 | 70 | 7 | 7 | 7 | 7 | 7 3940649673949245 | 3940649673949245 | 7
8 | 80 | 8 | 8 | 8 | 8 | 8 10 | 10 | 10
9 | 90 | 9 | 9 | 9 | 9 | 9 20 | 20 | 11
10 | 100 | 10 | 10 | 10 | 10 | 10 30 | 30 | 12
(10 rows) 40 | 40 | 13
50 | 50 | 14
-- distributed table 60 | 60 | 15
SELECT create_distributed_table('generated_identities_test', 'a'); 70 | 70 | 16
create_distributed_table 80 | 80 | 17
--------------------------------------------------------------------- 90 | 90 | 18
100 | 100 | 19
(1 row) 110 | 110 | 20
998 | 120 | 998
999 | 999 | 999
(20 rows)
-- alter table .. alter column .. add is unsupported -- alter table .. alter column .. add is unsupported
ALTER TABLE generated_identities_test ALTER COLUMN g ADD GENERATED ALWAYS AS IDENTITY; ALTER TABLE partitioned_table ALTER COLUMN g ADD GENERATED ALWAYS AS IDENTITY;
ERROR: alter table command is currently unsupported ERROR: alter table command is currently unsupported
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP|VALIDATE CONSTRAINT, SET (), RESET (), ENABLE|DISABLE|NO FORCE|FORCE ROW LEVEL SECURITY, ATTACH|DETACH PARTITION and TYPE subcommands are supported. DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP|VALIDATE CONSTRAINT, SET (), RESET (), ENABLE|DISABLE|NO FORCE|FORCE ROW LEVEL SECURITY, ATTACH|DETACH PARTITION and TYPE subcommands are supported.
-- alter table .. alter column is unsupported -- alter table .. alter column is unsupported
ALTER TABLE generated_identities_test ALTER COLUMN b TYPE int; ALTER TABLE partitioned_table ALTER COLUMN b TYPE int;
ERROR: cannot execute ALTER COLUMN command involving identity column ERROR: cannot execute ALTER COLUMN command involving identity column
SELECT alter_distributed_table('generated_identities_test', 'g'); DROP TABLE partitioned_table;
alter_distributed_table -- create a table for reference table testing.
--------------------------------------------------------------------- CREATE TABLE reference_table (
a bigint CONSTRAINT myconname GENERATED BY DEFAULT AS IDENTITY (START WITH 10 INCREMENT BY 10),
(1 row) b bigint GENERATED ALWAYS AS IDENTITY (START WITH 10 INCREMENT BY 10) UNIQUE,
c int
SELECT alter_distributed_table('generated_identities_test', 'b');
alter_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT alter_distributed_table('generated_identities_test', 'c');
alter_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT undistribute_table('generated_identities_test');
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT * FROM generated_identities_test ORDER BY g;
a | b | c | d | e | f | g
---------------------------------------------------------------------
1 | 10 | 1 | 1 | 1 | 1 | 1
2 | 20 | 2 | 2 | 2 | 2 | 2
3 | 30 | 3 | 3 | 3 | 3 | 3
4 | 40 | 4 | 4 | 4 | 4 | 4
5 | 50 | 5 | 5 | 5 | 5 | 5
6 | 60 | 6 | 6 | 6 | 6 | 6
7 | 70 | 7 | 7 | 7 | 7 | 7
8 | 80 | 8 | 8 | 8 | 8 | 8
9 | 90 | 9 | 9 | 9 | 9 | 9
10 | 100 | 10 | 10 | 10 | 10 | 10
(10 rows)
-- reference table
DROP TABLE generated_identities_test;
CREATE TABLE generated_identities_test (
a int GENERATED BY DEFAULT AS IDENTITY,
b bigint GENERATED ALWAYS AS IDENTITY (START WITH 10 INCREMENT BY 10),
c smallint GENERATED BY DEFAULT AS IDENTITY,
d serial,
e bigserial,
f smallserial,
g int
); );
SELECT create_reference_table('generated_identities_test'); SELECT create_reference_table('reference_table');
create_reference_table create_reference_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
\d generated_identities_test \d reference_table
Table "generated_identities.generated_identities_test" Table "generated_identities.reference_table"
Column | Type | Collation | Nullable | Default Column | Type | Collation | Nullable | Default
--------------------------------------------------------------------- ---------------------------------------------------------------------
a | integer | | not null | generated by default as identity a | bigint | | not null | generated by default as identity
b | bigint | | not null | generated always as identity b | bigint | | not null | generated always as identity
c | smallint | | not null | generated by default as identity c | integer | | |
d | integer | | not null | nextval('generated_identities_test_d_seq'::regclass) Indexes:
e | bigint | | not null | nextval('generated_identities_test_e_seq'::regclass) "reference_table_b_key" UNIQUE CONSTRAINT, btree (b)
f | smallint | | not null | nextval('generated_identities_test_f_seq'::regclass)
g | integer | | |
\c - - - :worker_1_port \c - - - :worker_1_port
\d generated_identities.generated_identities_test SET search_path TO generated_identities;
Table "generated_identities.generated_identities_test" \d generated_identities.reference_table
Table "generated_identities.reference_table"
Column | Type | Collation | Nullable | Default Column | Type | Collation | Nullable | Default
--------------------------------------------------------------------- ---------------------------------------------------------------------
a | integer | | not null | worker_nextval('generated_identities.generated_identities_test_a_seq'::regclass) a | bigint | | not null | generated by default as identity
b | bigint | | not null | nextval('generated_identities.generated_identities_test_b_seq'::regclass) b | bigint | | not null | generated always as identity
c | smallint | | not null | worker_nextval('generated_identities.generated_identities_test_c_seq'::regclass) c | integer | | |
d | integer | | not null | worker_nextval('generated_identities.generated_identities_test_d_seq'::regclass) Indexes:
e | bigint | | not null | nextval('generated_identities.generated_identities_test_e_seq'::regclass) "reference_table_b_key" UNIQUE CONSTRAINT, btree (b)
f | smallint | | not null | worker_nextval('generated_identities.generated_identities_test_f_seq'::regclass)
g | integer | | | INSERT INTO reference_table (c)
SELECT s FROM generate_series(1,10) s;
--on master
select * from reference_table;
a | b | c
---------------------------------------------------------------------
3940649673949185 | 3940649673949185 | 1
3940649673949195 | 3940649673949195 | 2
3940649673949205 | 3940649673949205 | 3
3940649673949215 | 3940649673949215 | 4
3940649673949225 | 3940649673949225 | 5
3940649673949235 | 3940649673949235 | 6
3940649673949245 | 3940649673949245 | 7
3940649673949255 | 3940649673949255 | 8
3940649673949265 | 3940649673949265 | 9
3940649673949275 | 3940649673949275 | 10
(10 rows)
\c - - - :master_port \c - - - :master_port
SET search_path TO generated_identities; SET search_path TO generated_identities;
SET client_min_messages to ERROR; SET client_min_messages to ERROR;
INSERT INTO generated_identities_test (g) INSERT INTO reference_table (c)
SELECT s FROM generate_series(11,20) s; SELECT s FROM generate_series(11,20) s;
SELECT * FROM generated_identities_test ORDER BY g; SELECT * FROM reference_table ORDER BY c ASC;
a | b | c | d | e | f | g a | b | c
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 | 10 | 1 | 1 | 1 | 1 | 11 3940649673949185 | 3940649673949185 | 1
2 | 20 | 2 | 2 | 2 | 2 | 12 3940649673949195 | 3940649673949195 | 2
3 | 30 | 3 | 3 | 3 | 3 | 13 3940649673949205 | 3940649673949205 | 3
4 | 40 | 4 | 4 | 4 | 4 | 14 3940649673949215 | 3940649673949215 | 4
5 | 50 | 5 | 5 | 5 | 5 | 15 3940649673949225 | 3940649673949225 | 5
6 | 60 | 6 | 6 | 6 | 6 | 16 3940649673949235 | 3940649673949235 | 6
7 | 70 | 7 | 7 | 7 | 7 | 17 3940649673949245 | 3940649673949245 | 7
8 | 80 | 8 | 8 | 8 | 8 | 18 3940649673949255 | 3940649673949255 | 8
9 | 90 | 9 | 9 | 9 | 9 | 19 3940649673949265 | 3940649673949265 | 9
10 | 100 | 10 | 10 | 10 | 10 | 20 3940649673949275 | 3940649673949275 | 10
(10 rows) 10 | 10 | 11
20 | 20 | 12
30 | 30 | 13
40 | 40 | 14
50 | 50 | 15
60 | 60 | 16
70 | 70 | 17
80 | 80 | 18
90 | 90 | 19
100 | 100 | 20
(20 rows)
SELECT undistribute_table('generated_identities_test'); DROP TABLE reference_table;
undistribute_table CREATE TABLE color (
color_id BIGINT GENERATED ALWAYS AS IDENTITY UNIQUE,
color_name VARCHAR NOT NULL
);
-- https://github.com/citusdata/citus/issues/6694
CREATE USER identity_test_user;
GRANT INSERT ON color TO identity_test_user;
GRANT USAGE ON SCHEMA generated_identities TO identity_test_user;
SET ROLE identity_test_user;
SELECT create_distributed_table('color', 'color_id');
ERROR: must be owner of table color
SET ROLE postgres;
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table_concurrently('color', 'color_id');
create_distributed_table_concurrently
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
\d generated_identities_test RESET citus.shard_replication_factor;
Table "generated_identities.generated_identities_test" \c - identity_test_user - :worker_1_port
Column | Type | Collation | Nullable | Default SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO color(color_name) VALUES ('Blue');
\c - postgres - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SET citus.next_shard_id TO 12400000;
DROP TABLE Color;
CREATE TABLE color (
color_id BIGINT GENERATED ALWAYS AS IDENTITY UNIQUE,
color_name VARCHAR NOT NULL
) USING columnar;
SELECT create_distributed_table('color', 'color_id');
create_distributed_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
a | integer | | not null | generated by default as identity
b | bigint | | not null | generated always as identity (1 row)
c | smallint | | not null | generated by default as identity
d | integer | | not null | nextval('generated_identities_test_d_seq'::regclass) INSERT INTO color(color_name) VALUES ('Blue');
e | bigint | | not null | nextval('generated_identities_test_e_seq'::regclass) \d+ color
f | smallint | | not null | nextval('generated_identities_test_f_seq'::regclass) Table "generated_identities.color"
g | integer | | | Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
---------------------------------------------------------------------
color_id | bigint | | not null | generated always as identity | plain | |
color_name | character varying | | not null | | extended | |
Indexes:
"color_color_id_key" UNIQUE CONSTRAINT, btree (color_id)
\c - - - :worker_1_port \c - - - :worker_1_port
\d generated_identities.generated_identities_test SET search_path TO generated_identities;
\c - - - :master_port \d+ color
Table "generated_identities.color"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
---------------------------------------------------------------------
color_id | bigint | | not null | generated always as identity | plain | |
color_name | character varying | | not null | | extended | |
Indexes:
"color_color_id_key" UNIQUE CONSTRAINT, btree (color_id)
INSERT INTO color(color_name) VALUES ('Red');
-- alter sequence .. restart
ALTER SEQUENCE color_color_id_seq RESTART WITH 1000;
ERROR: Altering a distributed sequence is currently not supported.
-- override system value
INSERT INTO color(color_id, color_name) VALUES (1, 'Red');
ERROR: cannot insert a non-DEFAULT value into column "color_id"
DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
HINT: Use OVERRIDING SYSTEM VALUE to override.
INSERT INTO color(color_id, color_name) VALUES (NULL, 'Red');
ERROR: cannot insert a non-DEFAULT value into column "color_id"
DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
HINT: Use OVERRIDING SYSTEM VALUE to override.
INSERT INTO color(color_id, color_name) OVERRIDING SYSTEM VALUE VALUES (1, 'Red');
ERROR: duplicate key value violates unique constraint "color_color_id_key_12400000"
DETAIL: Key (color_id)=(1) already exists.
CONTEXT: while executing command on localhost:xxxxx
-- update null or custom value
UPDATE color SET color_id = NULL;
ERROR: column "color_id" can only be updated to DEFAULT
DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
UPDATE color SET color_id = 1;
ERROR: column "color_id" can only be updated to DEFAULT
DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
\c - postgres - :master_port
SET search_path TO generated_identities; SET search_path TO generated_identities;
SET client_min_messages to ERROR; SET client_min_messages to ERROR;
-- alter table .. add column .. GENERATED .. AS IDENTITY -- alter table .. add column .. GENERATED .. AS IDENTITY
DROP TABLE IF EXISTS color;
CREATE TABLE color (
color_name VARCHAR NOT NULL
);
SELECT create_distributed_table('color', 'color_name');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER TABLE color ADD COLUMN color_id BIGINT GENERATED ALWAYS AS IDENTITY; ALTER TABLE color ADD COLUMN color_id BIGINT GENERATED ALWAYS AS IDENTITY;
INSERT INTO color(color_name) VALUES ('Red'); ERROR: cannot execute ADD COLUMN commands involving identity columns when metadata is synchronized to workers
ALTER TABLE color ADD COLUMN color_id_1 BIGINT GENERATED ALWAYS AS IDENTITY; -- alter sequence .. restart
ERROR: Cannot add an identity column because the table is not empty ALTER SEQUENCE color_color_id_seq RESTART WITH 1000;
DROP TABLE color; ERROR: Altering a distributed sequence is currently not supported.
-- insert data from workers -- override system value
CREATE TABLE color ( INSERT INTO color(color_id, color_name) VALUES (1, 'Red');
color_id BIGINT GENERATED ALWAYS AS IDENTITY UNIQUE, ERROR: cannot insert a non-DEFAULT value into column "color_id"
color_name VARCHAR NOT NULL DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
); HINT: Use OVERRIDING SYSTEM VALUE to override.
SELECT create_distributed_table('color', 'color_id'); INSERT INTO color(color_id, color_name) VALUES (NULL, 'Red');
create_distributed_table ERROR: cannot insert a non-DEFAULT value into column "color_id"
--------------------------------------------------------------------- DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
HINT: Use OVERRIDING SYSTEM VALUE to override.
(1 row) INSERT INTO color(color_id, color_name) OVERRIDING SYSTEM VALUE VALUES (1, 'Red');
ERROR: duplicate key value violates unique constraint "color_color_id_key_12400000"
\c - - - :worker_1_port DETAIL: Key (color_id)=(1) already exists.
SET search_path TO generated_identities; CONTEXT: while executing command on localhost:xxxxx
SET client_min_messages to ERROR; -- update null or custom value
INSERT INTO color(color_name) VALUES ('Red'); UPDATE color SET color_id = NULL;
\c - - - :master_port ERROR: column "color_id" can only be updated to DEFAULT
SET search_path TO generated_identities; DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
SET client_min_messages to ERROR; UPDATE color SET color_id = 1;
SELECT undistribute_table('color'); ERROR: column "color_id" can only be updated to DEFAULT
undistribute_table DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('color', 'color_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO color(color_name) VALUES ('Red');
SELECT count(*) from color;
count
---------------------------------------------------------------------
3
(1 row)
-- modify sequence & alter table
DROP TABLE color;
CREATE TABLE color (
color_id BIGINT GENERATED ALWAYS AS IDENTITY UNIQUE,
color_name VARCHAR NOT NULL
);
SELECT create_distributed_table('color', 'color_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SELECT undistribute_table('color');
undistribute_table
---------------------------------------------------------------------
(1 row)
ALTER SEQUENCE color_color_id_seq RENAME TO myseq;
SELECT create_distributed_table('color', 'color_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\ds+ myseq
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
generated_identities | myseq | sequence | postgres | permanent | 8192 bytes |
(1 row)
\ds+ color_color_id_seq
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
(0 rows)
\d color
Table "generated_identities.color"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
color_id | bigint | | not null | generated always as identity
color_name | character varying | | not null |
Indexes:
"color_color_id_key" UNIQUE CONSTRAINT, btree (color_id)
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
\ds+ myseq
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
generated_identities | myseq | sequence | postgres | permanent | 8192 bytes |
(1 row)
\ds+ color_color_id_seq
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
(0 rows)
\d color
Table "generated_identities.color"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
color_id | bigint | | not null | nextval('myseq'::regclass)
color_name | character varying | | not null |
Indexes:
"color_color_id_key" UNIQUE CONSTRAINT, btree (color_id)
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
ALTER SEQUENCE myseq RENAME TO color_color_id_seq;
\ds+ myseq
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
(0 rows)
\ds+ color_color_id_seq
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
generated_identities | color_color_id_seq | sequence | postgres | permanent | 8192 bytes |
(1 row)
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
\ds+ myseq
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
(0 rows)
\ds+ color_color_id_seq
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
generated_identities | color_color_id_seq | sequence | postgres | permanent | 8192 bytes |
(1 row)
\d color
Table "generated_identities.color"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
color_id | bigint | | not null | nextval('color_color_id_seq'::regclass)
color_name | character varying | | not null |
Indexes:
"color_color_id_key" UNIQUE CONSTRAINT, btree (color_id)
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SELECT alter_distributed_table('co23423lor', shard_count := 6);
ERROR: relation "co23423lor" does not exist
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
\ds+ color_color_id_seq
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
---------------------------------------------------------------------
generated_identities | color_color_id_seq | sequence | postgres | permanent | 8192 bytes |
(1 row)
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
DROP TABLE IF EXISTS test; DROP TABLE IF EXISTS test;
CREATE TABLE test (x int, y int, z bigint generated by default as identity); CREATE TABLE test (x int, y int, z bigint generated by default as identity);
SELECT create_distributed_table('test', 'x', colocate_with := 'none'); SELECT create_distributed_table('test', 'x', colocate_with := 'none');
@ -540,3 +428,4 @@ SELECT * FROM test;
(2 rows) (2 rows)
DROP SCHEMA generated_identities CASCADE; DROP SCHEMA generated_identities CASCADE;
DROP USER identity_test_user;

View File

@ -0,0 +1,431 @@
-- This test file has an alternative output because of error messages vary for PG13
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int <= 13 AS server_version_le_13;
server_version_le_13
---------------------------------------------------------------------
t
(1 row)
CREATE SCHEMA generated_identities;
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SET citus.shard_replication_factor TO 1;
SELECT 1 from citus_add_node('localhost', :master_port, groupId=>0);
?column?
---------------------------------------------------------------------
1
(1 row)
-- smallint identity column can not be distributed
CREATE TABLE smallint_identity_column (
a smallint GENERATED BY DEFAULT AS IDENTITY
);
SELECT create_distributed_table('smallint_identity_column', 'a');
ERROR: cannot complete operation on generated_identities.smallint_identity_column with smallint/int identity column
HINT: Use bigint identity column instead.
SELECT create_distributed_table_concurrently('smallint_identity_column', 'a');
ERROR: cannot complete operation on generated_identities.smallint_identity_column with smallint/int identity column
HINT: Use bigint identity column instead.
SELECT create_reference_table('smallint_identity_column');
ERROR: cannot complete operation on a table with identity column
SELECT citus_add_local_table_to_metadata('smallint_identity_column');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
DROP TABLE smallint_identity_column;
-- int identity column can not be distributed
CREATE TABLE int_identity_column (
a int GENERATED BY DEFAULT AS IDENTITY
);
SELECT create_distributed_table('int_identity_column', 'a');
ERROR: cannot complete operation on generated_identities.int_identity_column with smallint/int identity column
HINT: Use bigint identity column instead.
SELECT create_distributed_table_concurrently('int_identity_column', 'a');
ERROR: cannot complete operation on generated_identities.int_identity_column with smallint/int identity column
HINT: Use bigint identity column instead.
SELECT create_reference_table('int_identity_column');
ERROR: cannot complete operation on a table with identity column
SELECT citus_add_local_table_to_metadata('int_identity_column');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
DROP TABLE int_identity_column;
RESET citus.shard_replication_factor;
CREATE TABLE bigint_identity_column (
a bigint GENERATED BY DEFAULT AS IDENTITY,
b int
);
SELECT citus_add_local_table_to_metadata('bigint_identity_column');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
DROP TABLE bigint_identity_column;
CREATE TABLE bigint_identity_column (
a bigint GENERATED BY DEFAULT AS IDENTITY,
b int
);
SELECT create_distributed_table('bigint_identity_column', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\d bigint_identity_column
Table "generated_identities.bigint_identity_column"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | bigint | | not null | generated by default as identity
b | integer | | |
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO bigint_identity_column (b)
SELECT s FROM generate_series(1,10) s;
\d generated_identities.bigint_identity_column
Table "generated_identities.bigint_identity_column"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | bigint | | not null | generated by default as identity
b | integer | | |
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO bigint_identity_column (b)
SELECT s FROM generate_series(11,20) s;
SELECT * FROM bigint_identity_column ORDER BY B ASC;
a | b
---------------------------------------------------------------------
3940649673949185 | 1
3940649673949186 | 2
3940649673949187 | 3
3940649673949188 | 4
3940649673949189 | 5
3940649673949190 | 6
3940649673949191 | 7
3940649673949192 | 8
3940649673949193 | 9
3940649673949194 | 10
1 | 11
2 | 12
3 | 13
4 | 14
5 | 15
6 | 16
7 | 17
8 | 18
9 | 19
10 | 20
(20 rows)
-- table with identity column cannot be altered.
SELECT alter_distributed_table('bigint_identity_column', 'b');
ERROR: cannot complete operation on a table with identity column
-- table with identity column cannot be undistributed.
SELECT undistribute_table('bigint_identity_column');
ERROR: cannot complete operation on a table with identity column
DROP TABLE bigint_identity_column;
-- create a partitioned table for testing.
CREATE TABLE partitioned_table (
a bigint CONSTRAINT myconname GENERATED BY DEFAULT AS IDENTITY (START WITH 10 INCREMENT BY 10),
b bigint GENERATED ALWAYS AS IDENTITY (START WITH 10 INCREMENT BY 10),
c int
)
PARTITION BY RANGE (c);
CREATE TABLE partitioned_table_1_50 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (50);
CREATE TABLE partitioned_table_50_500 PARTITION OF partitioned_table FOR VALUES FROM (50) TO (1000);
SELECT create_distributed_table('partitioned_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\d partitioned_table
Partitioned table "generated_identities.partitioned_table"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | bigint | | not null | generated by default as identity
b | bigint | | not null | generated always as identity
c | integer | | |
Partition key: RANGE (c)
Number of partitions: 2 (Use \d+ to list them.)
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
\d generated_identities.partitioned_table
Partitioned table "generated_identities.partitioned_table"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | bigint | | not null | generated by default as identity
b | bigint | | not null | generated always as identity
c | integer | | |
Partition key: RANGE (c)
Number of partitions: 2 (Use \d+ to list them.)
insert into partitioned_table (c) values (1);
insert into partitioned_table (c) SELECT 2;
INSERT INTO partitioned_table (c)
SELECT s FROM generate_series(3,7) s;
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO partitioned_table (c)
SELECT s FROM generate_series(10,20) s;
INSERT INTO partitioned_table (a,c) VALUES (998,998);
INSERT INTO partitioned_table (a,b,c) OVERRIDING SYSTEM VALUE VALUES (999,999,999);
SELECT * FROM partitioned_table ORDER BY c ASC;
a | b | c
---------------------------------------------------------------------
3940649673949185 | 3940649673949185 | 1
3940649673949195 | 3940649673949195 | 2
3940649673949205 | 3940649673949205 | 3
3940649673949215 | 3940649673949215 | 4
3940649673949225 | 3940649673949225 | 5
3940649673949235 | 3940649673949235 | 6
3940649673949245 | 3940649673949245 | 7
10 | 10 | 10
20 | 20 | 11
30 | 30 | 12
40 | 40 | 13
50 | 50 | 14
60 | 60 | 15
70 | 70 | 16
80 | 80 | 17
90 | 90 | 18
100 | 100 | 19
110 | 110 | 20
998 | 120 | 998
999 | 999 | 999
(20 rows)
-- alter table .. alter column .. add is unsupported
ALTER TABLE partitioned_table ALTER COLUMN g ADD GENERATED ALWAYS AS IDENTITY;
ERROR: alter table command is currently unsupported
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP|VALIDATE CONSTRAINT, SET (), RESET (), ENABLE|DISABLE|NO FORCE|FORCE ROW LEVEL SECURITY, ATTACH|DETACH PARTITION and TYPE subcommands are supported.
-- alter table .. alter column is unsupported
ALTER TABLE partitioned_table ALTER COLUMN b TYPE int;
ERROR: cannot execute ALTER COLUMN command involving identity column
DROP TABLE partitioned_table;
-- create a table for reference table testing.
CREATE TABLE reference_table (
a bigint CONSTRAINT myconname GENERATED BY DEFAULT AS IDENTITY (START WITH 10 INCREMENT BY 10),
b bigint GENERATED ALWAYS AS IDENTITY (START WITH 10 INCREMENT BY 10) UNIQUE,
c int
);
SELECT create_reference_table('reference_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
\d reference_table
Table "generated_identities.reference_table"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | bigint | | not null | generated by default as identity
b | bigint | | not null | generated always as identity
c | integer | | |
Indexes:
"reference_table_b_key" UNIQUE CONSTRAINT, btree (b)
\c - - - :worker_1_port
SET search_path TO generated_identities;
\d generated_identities.reference_table
Table "generated_identities.reference_table"
Column | Type | Collation | Nullable | Default
---------------------------------------------------------------------
a | bigint | | not null | generated by default as identity
b | bigint | | not null | generated always as identity
c | integer | | |
Indexes:
"reference_table_b_key" UNIQUE CONSTRAINT, btree (b)
INSERT INTO reference_table (c)
SELECT s FROM generate_series(1,10) s;
--on master
select * from reference_table;
a | b | c
---------------------------------------------------------------------
3940649673949185 | 3940649673949185 | 1
3940649673949195 | 3940649673949195 | 2
3940649673949205 | 3940649673949205 | 3
3940649673949215 | 3940649673949215 | 4
3940649673949225 | 3940649673949225 | 5
3940649673949235 | 3940649673949235 | 6
3940649673949245 | 3940649673949245 | 7
3940649673949255 | 3940649673949255 | 8
3940649673949265 | 3940649673949265 | 9
3940649673949275 | 3940649673949275 | 10
(10 rows)
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO reference_table (c)
SELECT s FROM generate_series(11,20) s;
SELECT * FROM reference_table ORDER BY c ASC;
a | b | c
---------------------------------------------------------------------
3940649673949185 | 3940649673949185 | 1
3940649673949195 | 3940649673949195 | 2
3940649673949205 | 3940649673949205 | 3
3940649673949215 | 3940649673949215 | 4
3940649673949225 | 3940649673949225 | 5
3940649673949235 | 3940649673949235 | 6
3940649673949245 | 3940649673949245 | 7
3940649673949255 | 3940649673949255 | 8
3940649673949265 | 3940649673949265 | 9
3940649673949275 | 3940649673949275 | 10
10 | 10 | 11
20 | 20 | 12
30 | 30 | 13
40 | 40 | 14
50 | 50 | 15
60 | 60 | 16
70 | 70 | 17
80 | 80 | 18
90 | 90 | 19
100 | 100 | 20
(20 rows)
DROP TABLE reference_table;
CREATE TABLE color (
color_id BIGINT GENERATED ALWAYS AS IDENTITY UNIQUE,
color_name VARCHAR NOT NULL
);
-- https://github.com/citusdata/citus/issues/6694
CREATE USER identity_test_user;
GRANT INSERT ON color TO identity_test_user;
GRANT USAGE ON SCHEMA generated_identities TO identity_test_user;
SET ROLE identity_test_user;
SELECT create_distributed_table('color', 'color_id');
ERROR: must be owner of table color
SET ROLE postgres;
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table_concurrently('color', 'color_id');
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
RESET citus.shard_replication_factor;
\c - identity_test_user - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO color(color_name) VALUES ('Blue');
\c - postgres - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SET citus.next_shard_id TO 12400000;
DROP TABLE Color;
CREATE TABLE color (
color_id BIGINT GENERATED ALWAYS AS IDENTITY UNIQUE,
color_name VARCHAR NOT NULL
) USING columnar;
SELECT create_distributed_table('color', 'color_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO color(color_name) VALUES ('Blue');
\d+ color
Table "generated_identities.color"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
---------------------------------------------------------------------
color_id | bigint | | not null | generated always as identity | plain | |
color_name | character varying | | not null | | extended | |
Indexes:
"color_color_id_key" UNIQUE CONSTRAINT, btree (color_id)
\c - - - :worker_1_port
SET search_path TO generated_identities;
\d+ color
Table "generated_identities.color"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
---------------------------------------------------------------------
color_id | bigint | | not null | generated always as identity | plain | |
color_name | character varying | | not null | | extended | |
Indexes:
"color_color_id_key" UNIQUE CONSTRAINT, btree (color_id)
INSERT INTO color(color_name) VALUES ('Red');
-- alter sequence .. restart
ALTER SEQUENCE color_color_id_seq RESTART WITH 1000;
ERROR: Altering a distributed sequence is currently not supported.
-- override system value
INSERT INTO color(color_id, color_name) VALUES (1, 'Red');
ERROR: cannot insert into column "color_id"
DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
HINT: Use OVERRIDING SYSTEM VALUE to override.
INSERT INTO color(color_id, color_name) VALUES (NULL, 'Red');
ERROR: cannot insert into column "color_id"
DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
HINT: Use OVERRIDING SYSTEM VALUE to override.
INSERT INTO color(color_id, color_name) OVERRIDING SYSTEM VALUE VALUES (1, 'Red');
ERROR: duplicate key value violates unique constraint "color_color_id_key_12400000"
DETAIL: Key (color_id)=(1) already exists.
CONTEXT: while executing command on localhost:xxxxx
-- update null or custom value
UPDATE color SET color_id = NULL;
ERROR: column "color_id" can only be updated to DEFAULT
DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
UPDATE color SET color_id = 1;
ERROR: column "color_id" can only be updated to DEFAULT
DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
\c - postgres - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
-- alter table .. add column .. GENERATED .. AS IDENTITY
ALTER TABLE color ADD COLUMN color_id BIGINT GENERATED ALWAYS AS IDENTITY;
ERROR: cannot execute ADD COLUMN commands involving identity columns when metadata is synchronized to workers
-- alter sequence .. restart
ALTER SEQUENCE color_color_id_seq RESTART WITH 1000;
ERROR: Altering a distributed sequence is currently not supported.
-- override system value
INSERT INTO color(color_id, color_name) VALUES (1, 'Red');
ERROR: cannot insert into column "color_id"
DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
HINT: Use OVERRIDING SYSTEM VALUE to override.
INSERT INTO color(color_id, color_name) VALUES (NULL, 'Red');
ERROR: cannot insert into column "color_id"
DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
HINT: Use OVERRIDING SYSTEM VALUE to override.
INSERT INTO color(color_id, color_name) OVERRIDING SYSTEM VALUE VALUES (1, 'Red');
ERROR: duplicate key value violates unique constraint "color_color_id_key_12400000"
DETAIL: Key (color_id)=(1) already exists.
CONTEXT: while executing command on localhost:xxxxx
-- update null or custom value
UPDATE color SET color_id = NULL;
ERROR: column "color_id" can only be updated to DEFAULT
DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
UPDATE color SET color_id = 1;
ERROR: column "color_id" can only be updated to DEFAULT
DETAIL: Column "color_id" is an identity column defined as GENERATED ALWAYS.
DROP TABLE IF EXISTS test;
CREATE TABLE test (x int, y int, z bigint generated by default as identity);
SELECT create_distributed_table('test', 'x', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test VALUES (1,2);
INSERT INTO test SELECT x, y FROM test WHERE x = 1;
SELECT * FROM test;
x | y | z
---------------------------------------------------------------------
1 | 2 | 1
1 | 2 | 2
(2 rows)
DROP SCHEMA generated_identities CASCADE;
DROP USER identity_test_user;

View File

@ -1365,7 +1365,8 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_internal_is_replication_origin_tracking_active() boolean | function citus_internal_is_replication_origin_tracking_active() boolean
| function citus_internal_start_replication_origin_tracking() void | function citus_internal_start_replication_origin_tracking() void
| function citus_internal_stop_replication_origin_tracking() void | function citus_internal_stop_replication_origin_tracking() void
(3 rows) | function worker_adjust_identity_column_seq_ranges(regclass) void
(4 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version -- show running version

View File

@ -233,6 +233,7 @@ ORDER BY 1;
function truncate_local_data_after_distributing_table(regclass) function truncate_local_data_after_distributing_table(regclass)
function undistribute_table(regclass,boolean) function undistribute_table(regclass,boolean)
function update_distributed_table_colocation(regclass,text) function update_distributed_table_colocation(regclass,text)
function worker_adjust_identity_column_seq_ranges(regclass)
function worker_apply_inter_shard_ddl_command(bigint,text,bigint,text,text) function worker_apply_inter_shard_ddl_command(bigint,text,bigint,text,text)
function worker_apply_sequence_command(text) function worker_apply_sequence_command(text)
function worker_apply_sequence_command(text,regtype) function worker_apply_sequence_command(text,regtype)
@ -321,5 +322,5 @@ ORDER BY 1;
view citus_stat_statements view citus_stat_statements
view pg_dist_shard_placement view pg_dist_shard_placement
view time_partitions view time_partitions
(313 rows) (314 rows)

View File

@ -1,267 +1,228 @@
-- This test file has an alternative output because of error messages vary for PG13
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int <= 13 AS server_version_le_13;
CREATE SCHEMA generated_identities; CREATE SCHEMA generated_identities;
SET search_path TO generated_identities; SET search_path TO generated_identities;
SET client_min_messages to ERROR; SET client_min_messages to ERROR;
SET citus.shard_replication_factor TO 1;
SELECT 1 from citus_add_node('localhost', :master_port, groupId=>0); SELECT 1 from citus_add_node('localhost', :master_port, groupId=>0);
DROP TABLE IF EXISTS generated_identities_test; -- smallint identity column can not be distributed
CREATE TABLE smallint_identity_column (
-- create a partitioned table for testing. a smallint GENERATED BY DEFAULT AS IDENTITY
CREATE TABLE generated_identities_test (
a int CONSTRAINT myconname GENERATED BY DEFAULT AS IDENTITY,
b bigint GENERATED ALWAYS AS IDENTITY (START WITH 10 INCREMENT BY 10),
c smallint GENERATED BY DEFAULT AS IDENTITY,
d serial,
e bigserial,
f smallserial,
g int
)
PARTITION BY RANGE (a);
CREATE TABLE generated_identities_test_1_5 PARTITION OF generated_identities_test FOR VALUES FROM (1) TO (5);
CREATE TABLE generated_identities_test_5_50 PARTITION OF generated_identities_test FOR VALUES FROM (5) TO (50);
-- local tables
SELECT citus_add_local_table_to_metadata('generated_identities_test');
\d generated_identities_test
\c - - - :worker_1_port
\d generated_identities.generated_identities_test
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SELECT undistribute_table('generated_identities_test');
SELECT citus_remove_node('localhost', :master_port);
SELECT create_distributed_table('generated_identities_test', 'a');
\d generated_identities_test
\c - - - :worker_1_port
\d generated_identities.generated_identities_test
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
insert into generated_identities_test (g) values (1);
insert into generated_identities_test (g) SELECT 2;
INSERT INTO generated_identities_test (g)
SELECT s FROM generate_series(3,7) s;
SELECT * FROM generated_identities_test ORDER BY 1;
SELECT undistribute_table('generated_identities_test');
SELECT * FROM generated_identities_test ORDER BY 1;
\d generated_identities_test
\c - - - :worker_1_port
\d generated_identities.generated_identities_test
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO generated_identities_test (g)
SELECT s FROM generate_series(8,10) s;
SELECT * FROM generated_identities_test ORDER BY 1;
-- distributed table
SELECT create_distributed_table('generated_identities_test', 'a');
-- alter table .. alter column .. add is unsupported
ALTER TABLE generated_identities_test ALTER COLUMN g ADD GENERATED ALWAYS AS IDENTITY;
-- alter table .. alter column is unsupported
ALTER TABLE generated_identities_test ALTER COLUMN b TYPE int;
SELECT alter_distributed_table('generated_identities_test', 'g');
SELECT alter_distributed_table('generated_identities_test', 'b');
SELECT alter_distributed_table('generated_identities_test', 'c');
SELECT undistribute_table('generated_identities_test');
SELECT * FROM generated_identities_test ORDER BY g;
-- reference table
DROP TABLE generated_identities_test;
CREATE TABLE generated_identities_test (
a int GENERATED BY DEFAULT AS IDENTITY,
b bigint GENERATED ALWAYS AS IDENTITY (START WITH 10 INCREMENT BY 10),
c smallint GENERATED BY DEFAULT AS IDENTITY,
d serial,
e bigserial,
f smallserial,
g int
); );
SELECT create_distributed_table('smallint_identity_column', 'a');
SELECT create_distributed_table_concurrently('smallint_identity_column', 'a');
SELECT create_reference_table('smallint_identity_column');
SELECT citus_add_local_table_to_metadata('smallint_identity_column');
SELECT create_reference_table('generated_identities_test'); DROP TABLE smallint_identity_column;
\d generated_identities_test -- int identity column can not be distributed
CREATE TABLE int_identity_column (
a int GENERATED BY DEFAULT AS IDENTITY
);
SELECT create_distributed_table('int_identity_column', 'a');
SELECT create_distributed_table_concurrently('int_identity_column', 'a');
SELECT create_reference_table('int_identity_column');
SELECT citus_add_local_table_to_metadata('int_identity_column');
DROP TABLE int_identity_column;
RESET citus.shard_replication_factor;
CREATE TABLE bigint_identity_column (
a bigint GENERATED BY DEFAULT AS IDENTITY,
b int
);
SELECT citus_add_local_table_to_metadata('bigint_identity_column');
DROP TABLE bigint_identity_column;
CREATE TABLE bigint_identity_column (
a bigint GENERATED BY DEFAULT AS IDENTITY,
b int
);
SELECT create_distributed_table('bigint_identity_column', 'a');
\d bigint_identity_column
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
\d generated_identities.generated_identities_test INSERT INTO bigint_identity_column (b)
SELECT s FROM generate_series(1,10) s;
\d generated_identities.bigint_identity_column
\c - - - :master_port \c - - - :master_port
SET search_path TO generated_identities; SET search_path TO generated_identities;
SET client_min_messages to ERROR; SET client_min_messages to ERROR;
INSERT INTO generated_identities_test (g) INSERT INTO bigint_identity_column (b)
SELECT s FROM generate_series(11,20) s; SELECT s FROM generate_series(11,20) s;
SELECT * FROM generated_identities_test ORDER BY g; SELECT * FROM bigint_identity_column ORDER BY B ASC;
SELECT undistribute_table('generated_identities_test'); -- table with identity column cannot be altered.
SELECT alter_distributed_table('bigint_identity_column', 'b');
\d generated_identities_test -- table with identity column cannot be undistributed.
SELECT undistribute_table('bigint_identity_column');
DROP TABLE bigint_identity_column;
-- create a partitioned table for testing.
CREATE TABLE partitioned_table (
a bigint CONSTRAINT myconname GENERATED BY DEFAULT AS IDENTITY (START WITH 10 INCREMENT BY 10),
b bigint GENERATED ALWAYS AS IDENTITY (START WITH 10 INCREMENT BY 10),
c int
)
PARTITION BY RANGE (c);
CREATE TABLE partitioned_table_1_50 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (50);
CREATE TABLE partitioned_table_50_500 PARTITION OF partitioned_table FOR VALUES FROM (50) TO (1000);
SELECT create_distributed_table('partitioned_table', 'a');
\d partitioned_table
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
\d generated_identities.generated_identities_test \d generated_identities.partitioned_table
insert into partitioned_table (c) values (1);
insert into partitioned_table (c) SELECT 2;
INSERT INTO partitioned_table (c)
SELECT s FROM generate_series(3,7) s;
\c - - - :master_port \c - - - :master_port
SET search_path TO generated_identities; SET search_path TO generated_identities;
SET client_min_messages to ERROR; SET client_min_messages to ERROR;
INSERT INTO partitioned_table (c)
SELECT s FROM generate_series(10,20) s;
INSERT INTO partitioned_table (a,c) VALUES (998,998);
INSERT INTO partitioned_table (a,b,c) OVERRIDING SYSTEM VALUE VALUES (999,999,999);
SELECT * FROM partitioned_table ORDER BY c ASC;
-- alter table .. alter column .. add is unsupported
ALTER TABLE partitioned_table ALTER COLUMN g ADD GENERATED ALWAYS AS IDENTITY;
-- alter table .. alter column is unsupported
ALTER TABLE partitioned_table ALTER COLUMN b TYPE int;
DROP TABLE partitioned_table;
-- create a table for reference table testing.
CREATE TABLE reference_table (
a bigint CONSTRAINT myconname GENERATED BY DEFAULT AS IDENTITY (START WITH 10 INCREMENT BY 10),
b bigint GENERATED ALWAYS AS IDENTITY (START WITH 10 INCREMENT BY 10) UNIQUE,
c int
);
SELECT create_reference_table('reference_table');
\d reference_table
\c - - - :worker_1_port
SET search_path TO generated_identities;
\d generated_identities.reference_table
INSERT INTO reference_table (c)
SELECT s FROM generate_series(1,10) s;
--on master
select * from reference_table;
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO reference_table (c)
SELECT s FROM generate_series(11,20) s;
SELECT * FROM reference_table ORDER BY c ASC;
DROP TABLE reference_table;
CREATE TABLE color (
color_id BIGINT GENERATED ALWAYS AS IDENTITY UNIQUE,
color_name VARCHAR NOT NULL
);
-- https://github.com/citusdata/citus/issues/6694
CREATE USER identity_test_user;
GRANT INSERT ON color TO identity_test_user;
GRANT USAGE ON SCHEMA generated_identities TO identity_test_user;
SET ROLE identity_test_user;
SELECT create_distributed_table('color', 'color_id');
SET ROLE postgres;
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table_concurrently('color', 'color_id');
RESET citus.shard_replication_factor;
\c - identity_test_user - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO color(color_name) VALUES ('Blue');
\c - postgres - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SET citus.next_shard_id TO 12400000;
DROP TABLE Color;
CREATE TABLE color (
color_id BIGINT GENERATED ALWAYS AS IDENTITY UNIQUE,
color_name VARCHAR NOT NULL
) USING columnar;
SELECT create_distributed_table('color', 'color_id');
INSERT INTO color(color_name) VALUES ('Blue');
\d+ color
\c - - - :worker_1_port
SET search_path TO generated_identities;
\d+ color
INSERT INTO color(color_name) VALUES ('Red');
-- alter sequence .. restart
ALTER SEQUENCE color_color_id_seq RESTART WITH 1000;
-- override system value
INSERT INTO color(color_id, color_name) VALUES (1, 'Red');
INSERT INTO color(color_id, color_name) VALUES (NULL, 'Red');
INSERT INTO color(color_id, color_name) OVERRIDING SYSTEM VALUE VALUES (1, 'Red');
-- update null or custom value
UPDATE color SET color_id = NULL;
UPDATE color SET color_id = 1;
\c - postgres - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
-- alter table .. add column .. GENERATED .. AS IDENTITY -- alter table .. add column .. GENERATED .. AS IDENTITY
DROP TABLE IF EXISTS color;
CREATE TABLE color (
color_name VARCHAR NOT NULL
);
SELECT create_distributed_table('color', 'color_name');
ALTER TABLE color ADD COLUMN color_id BIGINT GENERATED ALWAYS AS IDENTITY; ALTER TABLE color ADD COLUMN color_id BIGINT GENERATED ALWAYS AS IDENTITY;
INSERT INTO color(color_name) VALUES ('Red');
ALTER TABLE color ADD COLUMN color_id_1 BIGINT GENERATED ALWAYS AS IDENTITY;
DROP TABLE color;
-- insert data from workers -- alter sequence .. restart
CREATE TABLE color ( ALTER SEQUENCE color_color_id_seq RESTART WITH 1000;
color_id BIGINT GENERATED ALWAYS AS IDENTITY UNIQUE, -- override system value
color_name VARCHAR NOT NULL INSERT INTO color(color_id, color_name) VALUES (1, 'Red');
); INSERT INTO color(color_id, color_name) VALUES (NULL, 'Red');
SELECT create_distributed_table('color', 'color_id'); INSERT INTO color(color_id, color_name) OVERRIDING SYSTEM VALUE VALUES (1, 'Red');
-- update null or custom value
\c - - - :worker_1_port UPDATE color SET color_id = NULL;
SET search_path TO generated_identities; UPDATE color SET color_id = 1;
SET client_min_messages to ERROR;
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SELECT undistribute_table('color');
SELECT create_distributed_table('color', 'color_id');
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO color(color_name) VALUES ('Red');
SELECT count(*) from color;
-- modify sequence & alter table
DROP TABLE color;
CREATE TABLE color (
color_id BIGINT GENERATED ALWAYS AS IDENTITY UNIQUE,
color_name VARCHAR NOT NULL
);
SELECT create_distributed_table('color', 'color_id');
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SELECT undistribute_table('color');
ALTER SEQUENCE color_color_id_seq RENAME TO myseq;
SELECT create_distributed_table('color', 'color_id');
\ds+ myseq
\ds+ color_color_id_seq
\d color
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
\ds+ myseq
\ds+ color_color_id_seq
\d color
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
ALTER SEQUENCE myseq RENAME TO color_color_id_seq;
\ds+ myseq
\ds+ color_color_id_seq
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
\ds+ myseq
\ds+ color_color_id_seq
\d color
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
SELECT alter_distributed_table('co23423lor', shard_count := 6);
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :worker_1_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
\ds+ color_color_id_seq
INSERT INTO color(color_name) VALUES ('Red');
\c - - - :master_port
SET search_path TO generated_identities;
SET client_min_messages to ERROR;
DROP TABLE IF EXISTS test; DROP TABLE IF EXISTS test;
CREATE TABLE test (x int, y int, z bigint generated by default as identity); CREATE TABLE test (x int, y int, z bigint generated by default as identity);
@ -271,3 +232,4 @@ INSERT INTO test SELECT x, y FROM test WHERE x = 1;
SELECT * FROM test; SELECT * FROM test;
DROP SCHEMA generated_identities CASCADE; DROP SCHEMA generated_identities CASCADE;
DROP USER identity_test_user;