diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 5cc2e7b2b..3e52ca55b 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -98,8 +98,6 @@ * once every LOG_PER_TUPLE_AMOUNT, the copy will be logged. */ #define LOG_PER_TUPLE_AMOUNT 1000000 -#define WORKER_MODIFY_IDENTITY_COLUMNS \ - "SELECT pg_catalog.worker_modify_identity_columns(%s)" /* local function forward declarations */ static void CreateDistributedTableConcurrently(Oid relationId, @@ -168,7 +166,6 @@ static void EnsureColocateWithTableIsValid(Oid relationId, char distributionMeth char *distributionColumnName, char *colocateWithTableName); static void WarnIfTableHaveNoReplicaIdentity(Oid relationId); -static void DistributeIdentityColumns(Oid targetRelationId); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_distributed_table); @@ -1203,8 +1200,6 @@ CreateCitusTable(Oid relationId, char *distributionColumnName, bool skip_validation = true; ExecuteForeignKeyCreateCommandList(originalForeignKeyRecreationCommands, skip_validation); - - DistributeIdentityColumns(relationId); } @@ -1253,7 +1248,7 @@ EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid ownerRelationId foreach_oid(citusTableId, citusTableIdList) { List *seqInfoList = NIL; - GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0); + GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0, DEPENDENCY_AUTO); SequenceInfo *seqInfo = NULL; foreach_ptr(seqInfo, seqInfoList) @@ -1330,7 +1325,7 @@ EnsureRelationHasCompatibleSequenceTypes(Oid relationId) { List *seqInfoList = NIL; - GetDependentSequencesWithRelation(relationId, &seqInfoList, 0); + GetDependentSequencesWithRelation(relationId, &seqInfoList, 0, DEPENDENCY_AUTO); EnsureDistributedSequencesHaveOneType(relationId, seqInfoList); } @@ -1844,51 +1839,6 @@ ErrorIfTableIsACatalogTable(Relation relation) } -/* - * DistributeIdentityColumns is responsible for marking sequences depend on - * identity columns of a given table. If the table has any identity columns, - * this function executes a command on workers to modify the identity columns - * min/max values to produce unique values on workers. - */ -static void -DistributeIdentityColumns(Oid targetRelationId) -{ - Relation relation = relation_open(targetRelationId, AccessShareLock); - TupleDesc tupleDescriptor = RelationGetDescr(relation); - relation_close(relation, NoLock); - - bool missingSequenceOk = false; - bool tableHasIdentityColumn = false; - for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts; - attributeIndex++) - { - Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex); - - if (attributeForm->attidentity) - { - tableHasIdentityColumn = true; - Oid seqOid = getIdentitySequence(targetRelationId, attributeForm->attnum, - missingSequenceOk); - - ObjectAddress seqAddress = { 0 }; - ObjectAddressSet(seqAddress, RelationRelationId, seqOid); - MarkObjectDistributed(&seqAddress); - } - } - - if (tableHasIdentityColumn) - { - StringInfo stringInfo = makeStringInfo(); - char *tableName = generate_qualified_relation_name(targetRelationId); - - appendStringInfo(stringInfo, - WORKER_MODIFY_IDENTITY_COLUMNS, - quote_literal_cstr(tableName)); - SendCommandToWorkersWithMetadata(stringInfo->data); - } -} - - /* * EnsureLocalTableEmptyIfNecessary errors out if the function should be empty * according to ShouldLocalTableBeEmpty but it is not. diff --git a/src/backend/distributed/commands/sequence.c b/src/backend/distributed/commands/sequence.c index e8c217bb5..f1757bb62 100644 --- a/src/backend/distributed/commands/sequence.c +++ b/src/backend/distributed/commands/sequence.c @@ -33,7 +33,8 @@ /* Local functions forward declarations for helper functions */ static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId); -static Oid SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress); +static Oid SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress, char + depType); static List * FilterDistributedSequences(GrantStmt *stmt); @@ -183,7 +184,7 @@ ExtractDefaultColumnsAndOwnedSequences(Oid relationId, List **columnNameList, char *columnName = NameStr(attributeForm->attname); List *columnOwnedSequences = - getOwnedSequences_internal(relationId, attributeIndex + 1, 0); + getOwnedSequences_internal(relationId, attributeIndex + 1, DEPENDENCY_AUTO); if (attributeForm->atthasdef && list_length(columnOwnedSequences) == 0) { @@ -453,21 +454,22 @@ PreprocessAlterSequenceStmt(Node *node, const char *queryString, /* the code-path only supports a single object */ Assert(list_length(addresses) == 1); + /* We have already asserted that we have exactly 1 address in the addresses. */ + ObjectAddress *address = linitial(addresses); + /* error out if the sequence is distributed */ - if (IsAnyObjectDistributed(addresses)) + if (IsAnyObjectDistributed(addresses) || SequenceUsedInDistributedTable(address, + DEPENDENCY_INTERNAL)) { ereport(ERROR, (errmsg( "Altering a distributed sequence is currently not supported."))); } - /* We have already asserted that we have exactly 1 address in the addresses. */ - ObjectAddress *address = linitial(addresses); - /* * error out if the sequence is used in a distributed table * and this is an ALTER SEQUENCE .. AS .. statement */ - Oid citusTableId = SequenceUsedInDistributedTable(address); + Oid citusTableId = SequenceUsedInDistributedTable(address, DEPENDENCY_AUTO); if (citusTableId != InvalidOid) { List *options = stmt->options; @@ -497,16 +499,19 @@ PreprocessAlterSequenceStmt(Node *node, const char *queryString, * SequenceUsedInDistributedTable returns true if the argument sequence * is used as the default value of a column in a distributed table. * Returns false otherwise + * See DependencyType for the possible values of depType. + * We use DEPENDENCY_INTERNAL for sequences created by identity column. + * DEPENDENCY_AUTO for regular sequences. */ static Oid -SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress) +SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress, char depType) { List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); Oid citusTableId = InvalidOid; foreach_oid(citusTableId, citusTableIdList) { List *seqInfoList = NIL; - GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0); + GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0, depType); SequenceInfo *seqInfo = NULL; foreach_ptr(seqInfo, seqInfoList) { diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index 13d49c6f0..4c32ae50f 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -3315,7 +3315,8 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement) */ AttrNumber attnum = get_attnum(relationId, command->name); List *seqInfoList = NIL; - GetDependentSequencesWithRelation(relationId, &seqInfoList, attnum); + GetDependentSequencesWithRelation(relationId, &seqInfoList, attnum, + DEPENDENCY_AUTO); if (seqInfoList != NIL) { ereport(ERROR, (errmsg("cannot execute ALTER COLUMN TYPE .. command " diff --git a/src/backend/distributed/metadata/dependency.c b/src/backend/distributed/metadata/dependency.c index a67c8fed0..dffeb482a 100644 --- a/src/backend/distributed/metadata/dependency.c +++ b/src/backend/distributed/metadata/dependency.c @@ -1834,7 +1834,7 @@ static List * GetRelationSequenceDependencyList(Oid relationId) { List *seqInfoList = NIL; - GetDependentSequencesWithRelation(relationId, &seqInfoList, 0); + GetDependentSequencesWithRelation(relationId, &seqInfoList, 0, DEPENDENCY_AUTO); List *seqIdList = NIL; SequenceInfo *seqInfo = NULL; diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 41d08e43b..fcceaf6a2 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -1586,10 +1586,13 @@ GetAttributeTypeOid(Oid relationId, AttrNumber attnum) * For both cases, we use the intermediate AttrDefault object from pg_depend. * If attnum is specified, we only return the sequences related to that * attribute of the relationId. + * See DependencyType for the possible values of depType. + * We use DEPENDENCY_INTERNAL for sequences created by identity column. + * DEPENDENCY_AUTO for regular sequences. */ void GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList, - AttrNumber attnum) + AttrNumber attnum, char depType) { Assert(*seqInfoList == NIL); @@ -1626,7 +1629,7 @@ GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList, if (deprec->classid == AttrDefaultRelationId && deprec->objsubid == 0 && deprec->refobjsubid != 0 && - deprec->deptype == DEPENDENCY_AUTO) + deprec->deptype == depType) { /* * We are going to generate corresponding SequenceInfo @@ -1635,7 +1638,7 @@ GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList, attrdefResult = lappend_oid(attrdefResult, deprec->objid); attrdefAttnumResult = lappend_int(attrdefAttnumResult, deprec->refobjsubid); } - else if (deprec->deptype == DEPENDENCY_AUTO && + else if (deprec->deptype == depType && deprec->refobjsubid != 0 && deprec->classid == RelationRelationId && get_rel_relkind(deprec->objid) == RELKIND_SEQUENCE) @@ -1882,6 +1885,51 @@ SequenceDependencyCommandList(Oid relationId) } +/* + * IdentitySequenceDependencyCommandList generate a command to execute a UDF (WORKER_MODIFY_IDENTITY_COLUMNS) on workers + * to modify the identity columns min/max values to produce unique values on workers. + */ +List * +IdentitySequenceDependencyCommandList(Oid targetRelationId) +{ + List *commandList = NIL; + + Relation relation = relation_open(targetRelationId, AccessShareLock); + TupleDesc tupleDescriptor = RelationGetDescr(relation); + relation_close(relation, NoLock); + + bool tableHasIdentityColumn = false; + for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts; + attributeIndex++) + { + Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex); + + if (attributeForm->attidentity) + { + tableHasIdentityColumn = true; + break; + } + } + + if (tableHasIdentityColumn) + { + StringInfo stringInfo = makeStringInfo(); + char *tableName = generate_qualified_relation_name(targetRelationId); + + appendStringInfo(stringInfo, + WORKER_MODIFY_IDENTITY_COLUMNS, + quote_literal_cstr(tableName)); + + + commandList = lappend(commandList, + makeTableDDLCommandString( + stringInfo->data)); + } + + return commandList; +} + + /* * CreateSequenceDependencyCommand generates a query string for calling * worker_record_sequence_dependency on the worker to recreate a sequence->table diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index 77da0b630..dca9906a6 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -497,6 +497,15 @@ GetFullTableCreationCommands(Oid relationId, tableDDLEventList = lappend(tableDDLEventList, truncateTriggerCommand); } + + /* + * For identity column sequences, we only need to modify + * their min/max values to produce unique values on the worker nodes. + */ + List *identitySequenceDependencyCommandList = + IdentitySequenceDependencyCommandList(relationId); + tableDDLEventList = list_concat(tableDDLEventList, + identitySequenceDependencyCommandList); } tableDDLEventList = list_concat(tableDDLEventList, postLoadCreationCommandList); diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 9774f0fd4..8bace7eeb 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -138,6 +138,7 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS) * worker_modify_identity_columns takes a table oid, runs an ALTER SEQUENCE statement * for each identity column to adjust the minvalue and maxvalue of the sequence owned by * identity column such that the sequence creates globally unique values. + * We use table oid instead of sequence name to avoid any potential conflicts between sequences of different tables. This way, we can safely iterate through identity columns on a specific table without any issues. While this may introduce a small amount of business logic to workers, it's a much safer approach overall. */ Datum worker_modify_identity_columns(PG_FUNCTION_ARGS) diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index e06b5268f..121d15db3 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -101,11 +101,12 @@ extern void SyncNodeMetadataToNodesMain(Datum main_arg); extern void SignalMetadataSyncDaemon(Oid database, int sig); extern bool ShouldInitiateMetadataSync(bool *lockFailure); extern List * SequenceDependencyCommandList(Oid relationId); +extern List * IdentitySequenceDependencyCommandList(Oid targetRelationId); extern List * DDLCommandsForSequence(Oid sequenceOid, char *ownerName); extern List * GetSequencesFromAttrDef(Oid attrdefOid); extern void GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList, - AttrNumber attnum); + AttrNumber attnum, char depType); extern List * GetDependentFunctionsWithRelation(Oid relationId); extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum); extern void SetLocalEnableMetadataSync(bool state); @@ -146,6 +147,8 @@ extern void SyncDeleteColocationGroupToNodes(uint32 colocationId); "placementid = EXCLUDED.placementid" #define METADATA_SYNC_CHANNEL "metadata_sync" +#define WORKER_MODIFY_IDENTITY_COLUMNS \ + "SELECT pg_catalog.worker_modify_identity_columns(%s)" /* controlled via GUC */ extern char *EnableManualMetadataChangesForUser;