diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index a698fd164..57b737c8c 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -399,6 +399,8 @@ UndistributeTable(TableConversionParameters *params) ErrorIfUnsupportedCascadeObjects(params->relationId); + ErrorIfTableHasIdentityColumn(params->relationId); + params->conversionType = UNDISTRIBUTE_TABLE; params->shardCountIsNull = true; TableConversionState *con = CreateTableConversion(params); @@ -431,6 +433,7 @@ AlterDistributedTable(TableConversionParameters *params) EnsureHashDistributedTable(params->relationId); ErrorIfUnsupportedCascadeObjects(params->relationId); + ErrorIfTableHasIdentityColumn(params->relationId); params->conversionType = ALTER_DISTRIBUTED_TABLE; TableConversionState *con = CreateTableConversion(params); @@ -494,6 +497,7 @@ AlterTableSetAccessMethod(TableConversionParameters *params) } ErrorIfUnsupportedCascadeObjects(params->relationId); + ErrorIfTableHasIdentityColumn(params->relationId); params->conversionType = ALTER_TABLE_SET_ACCESS_METHOD; params->shardCountIsNull = true; @@ -1537,97 +1541,6 @@ CreateMaterializedViewDDLCommand(Oid matViewOid) return query->data; } - -/* - * This function marks all the identity sequences as distributed on the given table. - */ -static void -MarkIdentitiesAsDistributed(Oid targetRelationId) -{ - Relation relation = relation_open(targetRelationId, AccessShareLock); - TupleDesc tupleDescriptor = RelationGetDescr(relation); - relation_close(relation, NoLock); - - bool missingSequenceOk = false; - - for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts; - attributeIndex++) - { - Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex); - - if (attributeForm->attidentity) - { - Oid seqOid = getIdentitySequence(targetRelationId, attributeForm->attnum, - missingSequenceOk); - - ObjectAddress seqAddress = { 0 }; - ObjectAddressSet(seqAddress, RelationRelationId, seqOid); - MarkObjectDistributed(&seqAddress); - } - } -} - - -/* - * This function returns sql statements to rename identites on the given table - */ -static void -PrepareRenameIdentitiesCommands(Oid sourceRelationId, Oid targetRelationId, - List **outCoordinatorCommands, List **outWorkerCommands) -{ - Relation targetRelation = relation_open(targetRelationId, AccessShareLock); - TupleDesc targetTupleDescriptor = RelationGetDescr(targetRelation); - relation_close(targetRelation, NoLock); - - bool missingSequenceOk = false; - - for (int attributeIndex = 0; attributeIndex < targetTupleDescriptor->natts; - attributeIndex++) - { - Form_pg_attribute attributeForm = TupleDescAttr(targetTupleDescriptor, - attributeIndex); - - if (attributeForm->attidentity) - { - char *columnName = NameStr(attributeForm->attname); - - Oid targetSequenceOid = getIdentitySequence(targetRelationId, - attributeForm->attnum, - missingSequenceOk); - char *targetSequenceName = generate_relation_name(targetSequenceOid, NIL); - - Oid sourceSequenceOid = getIdentitySequence(sourceRelationId, - attributeForm->attnum, - missingSequenceOk); - char *sourceSequenceName = generate_relation_name(sourceSequenceOid, NIL); - - /* to rename sequence on the coordinator */ - *outCoordinatorCommands = lappend(*outCoordinatorCommands, psprintf( - "SET citus.enable_ddl_propagation TO OFF; ALTER SEQUENCE %s RENAME TO %s; RESET citus.enable_ddl_propagation;", - quote_identifier( - targetSequenceName), - quote_identifier( - sourceSequenceName))); - - /* update workers to use existing sequence and drop the new one generated by PG */ - bool missingTableOk = true; - *outWorkerCommands = lappend(*outWorkerCommands, - GetAlterColumnWithNextvalDefaultCmd( - sourceSequenceOid, sourceRelationId, - columnName, - missingTableOk)); - - - /* drop the sequence generated by identity column */ - *outWorkerCommands = lappend(*outWorkerCommands, psprintf( - "DROP SEQUENCE IF EXISTS %s", - quote_identifier( - targetSequenceName))); - } - } -} - - /* * ReplaceTable replaces the source table with the target table. * It moves all the rows of the source table to target table with INSERT SELECT. @@ -1686,24 +1599,6 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, ExecuteQueryViaSPI(query->data, SPI_OK_INSERT); } - /* - * Drop identity dependencies (sequences marked as DEPENDENCY_INTERNAL) on the workers - * to keep their states after the source table is dropped. - */ - List *ownedIdentitySequences = getOwnedSequences_internal(sourceId, 0, - DEPENDENCY_INTERNAL); - if (ownedIdentitySequences != NIL && ShouldSyncTableMetadata(sourceId)) - { - char *qualifiedTableName = quote_qualified_identifier(schemaName, sourceName); - StringInfo command = makeStringInfo(); - - appendStringInfo(command, - "SELECT pg_catalog.worker_drop_sequence_dependency(%s);", - quote_literal_cstr(qualifiedTableName)); - - SendCommandToWorkersWithMetadata(command->data); - } - /* * Modify regular sequence dependencies (sequences marked as DEPENDENCY_AUTO) */ @@ -1763,23 +1658,6 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, quote_qualified_identifier(schemaName, sourceName)))); } - /* - * We need to prepare rename identities commands before dropping the original table, - * otherwise we can't find the original names of the identity sequences. - * We prepare separate commands for the coordinator and the workers because: - * In the coordinator, we simply need to rename the identity sequences - * to their names on the old table, because right now the identity - * sequences have default names generated by Postgres with the creation of the new table - * In the workers, we have not dropped the original identity sequences, - * so what we do is we alter the columns and set their default to the - * original identity sequences, and after that we drop the new sequences. - */ - List *coordinatorCommandsToRenameIdentites = NIL; - List *workerCommandsToRenameIdentites = NIL; - PrepareRenameIdentitiesCommands(sourceId, targetId, - &coordinatorCommandsToRenameIdentites, - &workerCommandsToRenameIdentites); - resetStringInfo(query); appendStringInfo(query, "DROP %sTABLE %s CASCADE", IsForeignTable(sourceId) ? "FOREIGN " : "", @@ -1797,27 +1675,6 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, quote_qualified_identifier(schemaName, targetName), quote_identifier(sourceName)); ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY); - - char *coordinatorCommand = NULL; - foreach_ptr(coordinatorCommand, coordinatorCommandsToRenameIdentites) - { - ExecuteQueryViaSPI(coordinatorCommand, SPI_OK_UTILITY); - } - - char *workerCommand = NULL; - foreach_ptr(workerCommand, workerCommandsToRenameIdentites) - { - SendCommandToWorkersWithMetadata(workerCommand); - } - - /* - * To preserve identity sequences states in case of redistributing the table again, - * we don't drop them when we undistribute a table. To maintain consistency and - * avoid future problems if we redistribute the table, we want to apply all changes happening to - * the identity sequence in the coordinator to their corresponding sequences in the workers as well. - * That's why we have to mark identity sequences as distributed - */ - MarkIdentitiesAsDistributed(targetId); } diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 86133322d..a95c66fec 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -98,6 +98,7 @@ * once every LOG_PER_TUPLE_AMOUNT, the copy will be logged. */ #define LOG_PER_TUPLE_AMOUNT 1000000 +#define WORKER_MODIFY_IDENTITY_COMMAND "SELECT worker_modify_identity_columns(%s)" /* local function forward declarations */ static void CreateDistributedTableConcurrently(Oid relationId, @@ -160,6 +161,7 @@ static void EnsureColocateWithTableIsValid(Oid relationId, char distributionMeth char *distributionColumnName, char *colocateWithTableName); static void WarnIfTableHaveNoReplicaIdentity(Oid relationId); +static void MarkIdentitiesAsDistributed(Oid targetRelationId); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_distributed_table); @@ -222,6 +224,7 @@ create_distributed_table(PG_FUNCTION_ARGS) EnsureCitusTableCanBeCreated(relationId); + /* enable create_distributed_table on an empty node */ InsertCoordinatorIfClusterEmpty(); @@ -301,6 +304,8 @@ create_distributed_table_concurrently(PG_FUNCTION_ARGS) shardCountIsStrict = true; } + ErrorIfTableHasUnsupportedIdentityColumn(relationId); + CreateDistributedTableConcurrently(relationId, distributionColumnName, distributionMethod, colocateWithTableName, @@ -963,6 +968,8 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName, char distributionMethod, int shardCount, bool shardCountIsStrict, char *colocateWithTableName) { + ErrorIfTableHasUnsupportedIdentityColumn(relationId); + /* * EnsureTableNotDistributed errors out when relation is a citus table but * we don't want to ask user to first undistribute their citus local tables @@ -1157,6 +1164,8 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName, bool skip_validation = true; ExecuteForeignKeyCreateCommandList(originalForeignKeyRecreationCommands, skip_validation); + + MarkIdentitiesAsDistributed(relationId); } @@ -1796,6 +1805,52 @@ ErrorIfTableIsACatalogTable(Relation relation) } + + +/* + * This function marks all the identity sequences as distributed on the given table. + */ +static void +MarkIdentitiesAsDistributed(Oid targetRelationId) +{ + Relation relation = relation_open(targetRelationId, AccessShareLock); + TupleDesc tupleDescriptor = RelationGetDescr(relation); + relation_close(relation, NoLock); + + bool missingSequenceOk = false; + bool tableHasIdentityColumn = false; + for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts; + attributeIndex++) + { + Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex); + + if (attributeForm->attidentity) + { + tableHasIdentityColumn = true; + Oid seqOid = getIdentitySequence(targetRelationId, attributeForm->attnum, + missingSequenceOk); + + ObjectAddress seqAddress = { 0 }; + ObjectAddressSet(seqAddress, RelationRelationId, seqOid); + MarkObjectDistributed(&seqAddress); + } + } + + if (tableHasIdentityColumn) + { + StringInfo stringInfo = makeStringInfo(); + char *tableName = generate_qualified_relation_name(targetRelationId); + + appendStringInfo(stringInfo, + WORKER_MODIFY_IDENTITY_COMMAND, + quote_literal_cstr(tableName)); + SendCommandToWorkersWithMetadata(stringInfo->data); + } + +} + + + /* * EnsureLocalTableEmptyIfNecessary errors out if the function should be empty * according to ShouldLocalTableBeEmpty but it is not. diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 735449973..d09cacc19 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -370,7 +370,7 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency) bool creatingShellTableOnRemoteNode = true; List *tableDDLCommands = GetFullTableCreationCommands(relationId, WORKER_NEXTVAL_SEQUENCE_DEFAULTS, - INCLUDE_IDENTITY_AS_SEQUENCE_DEFAULTS, + INCLUDE_IDENTITY, creatingShellTableOnRemoteNode); TableDDLCommand *tableDDLCommand = NULL; foreach_ptr(tableDDLCommand, tableDDLCommands) diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index 39a652f10..a229bbb84 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -1378,29 +1378,6 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand, } } - /* - * We check for ADD COLUMN .. GENERATED .. AS IDENTITY expr - * since it uses a sequence as an internal dependency - * we should deparse the statement - */ - constraint = NULL; - foreach_ptr(constraint, columnConstraints) - { - if (constraint->contype == CONSTR_IDENTITY) - { - deparseAT = true; - useInitialDDLCommandString = false; - - /* - * Since we don't support constraints for AT_AddColumn - * we have to set is_not_null to true explicitly for identity columns - */ - ColumnDef *newColDef = copyObject(columnDefinition); - newColDef->constraints = NULL; - newColDef->is_not_null = true; - newCmd->def = (Node *) newColDef; - } - } /* * We check for ADD COLUMN .. SERIAL pseudo-type @@ -2539,34 +2516,6 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement) } } } - - /* - * We check for ADD COLUMN .. GENERATED AS IDENTITY expr - * since it uses a seqeunce as an internal dependency - */ - constraint = NULL; - foreach_ptr(constraint, columnConstraints) - { - if (constraint->contype == CONSTR_IDENTITY) - { - AttrNumber attnum = get_attnum(relationId, - columnDefinition->colname); - bool missing_ok = false; - Oid seqOid = getIdentitySequence(relationId, attnum, missing_ok); - - if (ShouldSyncTableMetadata(relationId)) - { - needMetadataSyncForNewSequences = true; - alterTableDefaultNextvalCmd = - GetAddColumnWithNextvalDefaultCmd(seqOid, - relationId, - columnDefinition - ->colname, - columnDefinition - ->typeName); - } - } - } } /* * We check for ALTER COLUMN .. SET DEFAULT nextval('user_defined_seq') @@ -3222,6 +3171,17 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement) { if (columnConstraint->contype == CONSTR_IDENTITY) { + /* + * We currently don't support adding an identity column for an MX table + */ + if (ShouldSyncTableMetadata(relationId)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg( + "cannot execute ADD COLUMN commands involving identity" + " columns when metadata is synchronized to workers"))); + } + /* * Currently we don't support backfilling the new identity column with default values * if the table is not empty @@ -4011,3 +3971,50 @@ MakeNameListFromRangeVar(const RangeVar *rel) return list_make1(makeString(rel->relname)); } } + + +/* + * ErrorIfTableHasUnsupportedIdentityColumn errors out if the given table has any identity column other than bigint identity column + */ +void +ErrorIfTableHasUnsupportedIdentityColumn(Oid relationId) +{ + Relation relation = relation_open(relationId, AccessShareLock); + TupleDesc tupleDescriptor = RelationGetDescr(relation); + relation_close(relation, NoLock); + + for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts; + attributeIndex++) + { + Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex); + + if (attributeForm->attidentity && attributeForm->atttypid != INT8OID) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot complete operation on a table with smallint/int identity column"))); + } + } +} + +/* + * ErrorIfTableHasIdentityColumn errors out if the given table has identity column + */ +void +ErrorIfTableHasIdentityColumn(Oid relationId) +{ + Relation relation = relation_open(relationId, AccessShareLock); + TupleDesc tupleDescriptor = RelationGetDescr(relation); + relation_close(relation, NoLock); + + for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts; + attributeIndex++) + { + Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex); + + if (attributeForm->attidentity) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot complete operation on a table with identity column"))); + } + } +} diff --git a/src/backend/distributed/deparser/citus_ruleutils.c b/src/backend/distributed/deparser/citus_ruleutils.c index ada77b098..2c8bf6e06 100644 --- a/src/backend/distributed/deparser/citus_ruleutils.c +++ b/src/backend/distributed/deparser/citus_ruleutils.c @@ -422,7 +422,6 @@ pg_get_tableschemadef_string(Oid tableRelationId, IncludeSequenceDefaults else if (includeIdentityDefaults == INCLUDE_IDENTITY) { Form_pg_sequence pgSequenceForm = pg_get_sequencedef(seqOid); - uint64 sequenceStart = nextval_internal(seqOid, false); char *sequenceDef = psprintf( " GENERATED %s AS IDENTITY (INCREMENT BY " INT64_FORMAT \ " MINVALUE " INT64_FORMAT " MAXVALUE " @@ -433,7 +432,8 @@ pg_get_tableschemadef_string(Oid tableRelationId, IncludeSequenceDefaults "ALWAYS" : "BY DEFAULT", pgSequenceForm->seqincrement, pgSequenceForm->seqmin, - pgSequenceForm->seqmax, sequenceStart, + pgSequenceForm->seqmax, + pgSequenceForm->seqstart, pgSequenceForm->seqcache, pgSequenceForm->seqcycle ? "" : "NO "); diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 6a5840f78..81fe4c698 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -1635,8 +1635,7 @@ GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList, attrdefResult = lappend_oid(attrdefResult, deprec->objid); attrdefAttnumResult = lappend_int(attrdefAttnumResult, deprec->refobjsubid); } - else if ((deprec->deptype == DEPENDENCY_AUTO || deprec->deptype == - DEPENDENCY_INTERNAL) && + else if (deprec->deptype == DEPENDENCY_AUTO && deprec->refobjsubid != 0 && deprec->classid == RelationRelationId && get_rel_relkind(deprec->objid) == RELKIND_SEQUENCE) @@ -2605,8 +2604,7 @@ CreateShellTableOnWorkers(Oid relationId) List *commandList = list_make1(DISABLE_DDL_PROPAGATION); IncludeSequenceDefaults includeSequenceDefaults = WORKER_NEXTVAL_SEQUENCE_DEFAULTS; - IncludeIdentities includeIdentityDefaults = - INCLUDE_IDENTITY_AS_SEQUENCE_DEFAULTS; + IncludeIdentities includeIdentityDefaults = INCLUDE_IDENTITY; bool creatingShellTableOnRemoteNode = true; List *tableDDLCommands = GetFullTableCreationCommands(relationId, diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index c3ec4fafb..e9eab1c35 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -566,6 +566,9 @@ extern bool ConstrTypeCitusCanDefaultName(ConstrType constrType); extern char * GetAlterColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationId, char *colname, bool missingTableOk); +extern void ErrorIfTableHasUnsupportedIdentityColumn(Oid relationId); +extern void ErrorIfTableHasIdentityColumn(Oid relationId); + /* text_search.c - forward declarations */ extern List * GetCreateTextSearchConfigStatements(const ObjectAddress *address); extern List * GetCreateTextSearchDictionaryStatements(const ObjectAddress *address);