From 0aaa79828f9485626e35ace1d7f887f619ef9c3c Mon Sep 17 00:00:00 2001 From: Burak Velioglu Date: Fri, 28 Jan 2022 16:47:08 +0300 Subject: [PATCH] Use rename function --- src/backend/distributed/commands/sequence.c | 34 +++++++++++++++++++ .../worker/worker_data_fetch_protocol.c | 29 ++++------------ src/include/distributed/commands.h | 2 ++ src/include/distributed/worker_manager.h | 2 +- 4 files changed, 43 insertions(+), 24 deletions(-) diff --git a/src/backend/distributed/commands/sequence.c b/src/backend/distributed/commands/sequence.c index 674ced9f5..4aa04b2ef 100644 --- a/src/backend/distributed/commands/sequence.c +++ b/src/backend/distributed/commands/sequence.c @@ -11,6 +11,7 @@ #include "postgres.h" +#include "access/xact.h" #include "catalog/dependency.h" #include "catalog/namespace.h" #include "commands/defrem.h" @@ -23,6 +24,7 @@ #include "distributed/metadata/distobject.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" +#include "distributed/worker_create_or_replace.h" #include "nodes/parsenodes.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -698,3 +700,35 @@ GenerateBackupNameForSequenceCollision(const ObjectAddress *address) count++; } } + + +/* + * RenameExistingSequenceWithDifferentTypeIfExists renames the sequence's type if + * that sequence exists and the desired sequence type is different than it's type. + */ +void +RenameExistingSequenceWithDifferentTypeIfExists(RangeVar *sequence, Oid desiredSeqTypeId) +{ + Oid sequenceOid; + RangeVarGetAndCheckCreationNamespace(sequence, NoLock, &sequenceOid); + + if (OidIsValid(sequenceOid)) + { + Form_pg_sequence pgSequenceForm = pg_get_sequencedef(sequenceOid); + if (pgSequenceForm->seqtypid != desiredSeqTypeId) + { + ObjectAddress sequenceAddress = { 0 }; + ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid); + + char *newName = GenerateBackupNameForCollision(&sequenceAddress); + + RenameStmt *renameStmt = CreateRenameStatement(&sequenceAddress, newName); + const char *sqlRenameStmt = DeparseTreeNode((Node *) renameStmt); + ProcessUtilityParseTree((Node *) renameStmt, sqlRenameStmt, + PROCESS_UTILITY_QUERY, + NULL, None_Receiver, NULL); + + CommandCounterIncrement(); + } + } +} diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index ae8898f6b..8b55d5f93 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -28,6 +28,7 @@ #include "commands/extension.h" #include "commands/sequence.h" #include "distributed/citus_ruleutils.h" +#include "distributed/commands.h" #include "distributed/commands/multi_copy.h" #include "distributed/commands/utility_hook.h" #include "distributed/connection_management.h" @@ -470,30 +471,9 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS) * stayed on that node after a rollbacked create_distributed_table operation. * We must change it's name first to create the sequence with the correct type. */ - Oid sequenceOid; CreateSeqStmt *createSequenceStatement = (CreateSeqStmt *) commandNode; - char *sequenceName = createSequenceStatement->sequence->relname; - char *sequenceSchema = createSequenceStatement->sequence->schemaname; - - RangeVarGetAndCheckCreationNamespace(createSequenceStatement->sequence, NoLock, - &sequenceOid); - if (OidIsValid(sequenceOid)) - { - Form_pg_sequence pgSequenceForm = pg_get_sequencedef(sequenceOid); - if (pgSequenceForm->seqtypid != sequenceTypeId) - { - ObjectAddress sequenceAddress = { 0 }; - ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid); - - char *newName = GenerateBackupNameForCollision(&sequenceAddress); - - RenameStmt *renameStmt = CreateRenameStatement(&sequenceAddress, newName); - const char *sqlRenameStmt = DeparseTreeNode((Node *) renameStmt); - ProcessUtilityParseTree((Node *) renameStmt, sqlRenameStmt, - PROCESS_UTILITY_QUERY, - NULL, None_Receiver, NULL); - } - } + RenameExistingSequenceWithDifferentTypeIfExists(createSequenceStatement->sequence, + sequenceTypeId); /* run the CREATE SEQUENCE command */ ProcessUtilityParseTree(commandNode, commandString, PROCESS_UTILITY_QUERY, NULL, @@ -502,6 +482,9 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS) Oid sequenceRelationId = RangeVarGetRelid(createSequenceStatement->sequence, AccessShareLock, false); + char *sequenceName = createSequenceStatement->sequence->relname; + char *sequenceSchema = createSequenceStatement->sequence->schemaname; + Assert(sequenceRelationId != InvalidOid); AlterSequenceMinMax(sequenceRelationId, sequenceSchema, sequenceName, sequenceTypeId); diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index f415a8866..41141ee8a 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -398,6 +398,8 @@ extern ObjectAddress RenameSequenceStmtObjectAddress(Node *node, bool missing_ok extern void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt); extern void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt); extern char * GenerateBackupNameForSequenceCollision(const ObjectAddress *address); +extern void RenameExistingSequenceWithDifferentTypeIfExists(RangeVar *sequence, + Oid desiredSeqTypeId); /* statistics.c - forward declarations */ extern List * PreprocessCreateStatisticsStmt(Node *node, const char *queryString, diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 7e8aecb61..91d91a880 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -104,7 +104,7 @@ extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnI Datum value); extern uint32 CountPrimariesWithMetadata(void); extern WorkerNode * GetFirstPrimaryWorkerNode(void); -extern List * SyncObjectDependenciesCommandList(WorkerNode *workerNode); +extern List * SyncDistributedObjectsCommandList(WorkerNode *workerNode); extern List * PgDistTableMetadataSyncCommandList(void); /* Function declarations for worker node utilities */