From 02eacd411366fa7b3ea1f2d0daca217894976544 Mon Sep 17 00:00:00 2001 From: Gokhan Gulbiz Date: Tue, 28 Feb 2023 15:04:16 +0300 Subject: [PATCH] Introduce worker_modify_identity_columns() udf --- .../distributed/sql/citus--11.2-1--11.3-1.sql | 7 +++- .../sql/downgrades/citus--11.3-1--11.2-1.sql | 2 +- .../worker/worker_data_fetch_protocol.c | 42 +++++++++++++++++++ 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql index 981c5f375..f95b059d3 100644 --- a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql +++ b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql @@ -1,4 +1,9 @@ -- citus--11.2-1--11.3-1 --- bump version to 11.3-1 +CREATE FUNCTION worker_modify_identity_columns(regclass) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$worker_modify_identity_columns$$; +COMMENT ON FUNCTION worker_modify_identity_columns(regclass) + IS 'modify identity columns to produce globally unique values'; diff --git a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql index 7d71235d7..28e267a92 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql @@ -1,2 +1,2 @@ -- citus--11.3-1--11.2-1 --- this is an empty downgrade path since citus--11.2-1--11.3-1.sql is empty for now +DROP FUNCTION worker_modify_identity_columns(regclass); diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index d563c443b..758b42343 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -70,6 +70,7 @@ static void AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequence PG_FUNCTION_INFO_V1(worker_apply_shard_ddl_command); PG_FUNCTION_INFO_V1(worker_apply_inter_shard_ddl_command); PG_FUNCTION_INFO_V1(worker_apply_sequence_command); +PG_FUNCTION_INFO_V1(worker_modify_identity_columns); PG_FUNCTION_INFO_V1(worker_append_table_to_shard); PG_FUNCTION_INFO_V1(worker_nextval); @@ -132,6 +133,47 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +/* + * worker_modify_identity_columns takes a table oid, runs an ALTER SEQUENCE statement + * for each identity column to adjust the minvalue and maxvalue of the sequence owned by + * identity column such that the sequence creates globally unique values. + */ +Datum +worker_modify_identity_columns(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + Oid tableRelationId = PG_GETARG_OID(0); + + Relation tableRelation = relation_open(tableRelationId, AccessShareLock); + TupleDesc tableTupleDesc = RelationGetDescr(tableRelation); + relation_close(tableRelation, NoLock); + + bool missingSequenceOk = false; + + for (int attributeIndex = 0; attributeIndex < tableTupleDesc->natts; + attributeIndex++) + { + Form_pg_attribute attributeForm = TupleDescAttr(tableTupleDesc, + attributeIndex); + + if (attributeForm->attidentity) + { + Oid sequenceOid = getIdentitySequence(tableRelationId, + attributeForm->attnum, + missingSequenceOk); + + Oid sequenceSchemaOid = get_rel_namespace(sequenceOid); + char *sequenceSchemaName = get_namespace_name(sequenceSchemaOid); + char *sequenceName = get_rel_name(sequenceOid); + Oid sequenceTypeId = pg_get_sequencedef(sequenceOid)->seqtypid; + + AlterSequenceMinMax(sequenceOid, sequenceSchemaName, sequenceName, sequenceTypeId); + } + } + + PG_RETURN_VOID(); +} /* * worker_apply_sequence_command takes a CREATE SEQUENCE command string, runs the