Introduce worker_modify_identity_columns() udf

issue/6694
Gokhan Gulbiz 2023-02-28 15:04:16 +03:00
parent afed03336d
commit 02eacd4113
No known key found for this signature in database
GPG Key ID: 608EF06B6BD1B45B
3 changed files with 49 additions and 2 deletions

View File

@ -1,4 +1,9 @@
-- citus--11.2-1--11.3-1
-- bump version to 11.3-1
CREATE FUNCTION worker_modify_identity_columns(regclass)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_modify_identity_columns$$;
COMMENT ON FUNCTION worker_modify_identity_columns(regclass)
IS 'modify identity columns to produce globally unique values';

View File

@ -1,2 +1,2 @@
-- citus--11.3-1--11.2-1
-- this is an empty downgrade path since citus--11.2-1--11.3-1.sql is empty for now
DROP FUNCTION worker_modify_identity_columns(regclass);

View File

@ -70,6 +70,7 @@ static void AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequence
PG_FUNCTION_INFO_V1(worker_apply_shard_ddl_command);
PG_FUNCTION_INFO_V1(worker_apply_inter_shard_ddl_command);
PG_FUNCTION_INFO_V1(worker_apply_sequence_command);
PG_FUNCTION_INFO_V1(worker_modify_identity_columns);
PG_FUNCTION_INFO_V1(worker_append_table_to_shard);
PG_FUNCTION_INFO_V1(worker_nextval);
@ -132,6 +133,47 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
/*
* worker_modify_identity_columns takes a table oid, runs an ALTER SEQUENCE statement
* for each identity column to adjust the minvalue and maxvalue of the sequence owned by
* identity column such that the sequence creates globally unique values.
*/
Datum
worker_modify_identity_columns(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid tableRelationId = PG_GETARG_OID(0);
Relation tableRelation = relation_open(tableRelationId, AccessShareLock);
TupleDesc tableTupleDesc = RelationGetDescr(tableRelation);
relation_close(tableRelation, NoLock);
bool missingSequenceOk = false;
for (int attributeIndex = 0; attributeIndex < tableTupleDesc->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(tableTupleDesc,
attributeIndex);
if (attributeForm->attidentity)
{
Oid sequenceOid = getIdentitySequence(tableRelationId,
attributeForm->attnum,
missingSequenceOk);
Oid sequenceSchemaOid = get_rel_namespace(sequenceOid);
char *sequenceSchemaName = get_namespace_name(sequenceSchemaOid);
char *sequenceName = get_rel_name(sequenceOid);
Oid sequenceTypeId = pg_get_sequencedef(sequenceOid)->seqtypid;
AlterSequenceMinMax(sequenceOid, sequenceSchemaName, sequenceName, sequenceTypeId);
}
}
PG_RETURN_VOID();
}
/*
* worker_apply_sequence_command takes a CREATE SEQUENCE command string, runs the