mirror of https://github.com/citusdata/citus.git
Rename sequence instead of drop
parent
524ee83a9a
commit
94ccc8b7b5
|
@ -656,3 +656,45 @@ PostprocessAlterSequenceOwnerStmt(Node *node, const char *queryString)
|
||||||
|
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GenerateBackupNameForSequenceCollision generates a new sequence name for an existing
|
||||||
|
* sequence. The name is generated in such a way that the new name doesn't overlap with
|
||||||
|
* an existing relation by adding a suffix with incrementing number after the new name.
|
||||||
|
*/
|
||||||
|
char *
|
||||||
|
GenerateBackupNameForSequenceCollision(const ObjectAddress *address)
|
||||||
|
{
|
||||||
|
char *newName = palloc0(NAMEDATALEN);
|
||||||
|
char suffix[NAMEDATALEN] = { 0 };
|
||||||
|
int count = 0;
|
||||||
|
char *namespaceName = get_namespace_name(get_rel_namespace(address->objectId));
|
||||||
|
Oid schemaId = get_namespace_oid(namespaceName, false);
|
||||||
|
|
||||||
|
char *baseName = get_rel_name(address->objectId);
|
||||||
|
int baseLength = strlen(baseName);
|
||||||
|
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
int suffixLength = SafeSnprintf(suffix, NAMEDATALEN - 1, "(citus_backup_%d)",
|
||||||
|
count);
|
||||||
|
|
||||||
|
/* trim the base name at the end to leave space for the suffix and trailing \0 */
|
||||||
|
baseLength = Min(baseLength, NAMEDATALEN - suffixLength - 1);
|
||||||
|
|
||||||
|
/* clear newName before copying the potentially trimmed baseName and suffix */
|
||||||
|
memset(newName, 0, NAMEDATALEN);
|
||||||
|
strncpy_s(newName, NAMEDATALEN, baseName, baseLength);
|
||||||
|
strncpy_s(newName + baseLength, NAMEDATALEN - baseLength, suffix,
|
||||||
|
suffixLength);
|
||||||
|
|
||||||
|
Oid typeOid = get_relname_relid(newName, schemaId);
|
||||||
|
if (typeOid == InvalidOid)
|
||||||
|
{
|
||||||
|
return newName;
|
||||||
|
}
|
||||||
|
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -33,8 +33,6 @@
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
|
|
||||||
static const char * CreateStmtByObjectAddress(const ObjectAddress *address);
|
static const char * CreateStmtByObjectAddress(const ObjectAddress *address);
|
||||||
static RenameStmt * CreateRenameStatement(const ObjectAddress *address, char *newName);
|
|
||||||
static char * GenerateBackupNameForCollision(const ObjectAddress *address);
|
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(worker_create_or_replace_object);
|
PG_FUNCTION_INFO_V1(worker_create_or_replace_object);
|
||||||
|
|
||||||
|
@ -166,7 +164,7 @@ CreateStmtByObjectAddress(const ObjectAddress *address)
|
||||||
* address. This name should be used when renaming an existing object before creating the
|
* address. This name should be used when renaming an existing object before creating the
|
||||||
* new object locally on the worker.
|
* new object locally on the worker.
|
||||||
*/
|
*/
|
||||||
static char *
|
char *
|
||||||
GenerateBackupNameForCollision(const ObjectAddress *address)
|
GenerateBackupNameForCollision(const ObjectAddress *address)
|
||||||
{
|
{
|
||||||
switch (getObjectClass(address))
|
switch (getObjectClass(address))
|
||||||
|
@ -186,6 +184,15 @@ GenerateBackupNameForCollision(const ObjectAddress *address)
|
||||||
return GenerateBackupNameForTypeCollision(address);
|
return GenerateBackupNameForTypeCollision(address);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case OCLASS_CLASS:
|
||||||
|
{
|
||||||
|
char relKind = get_rel_relkind(address->objectId);
|
||||||
|
if (relKind == RELKIND_SEQUENCE)
|
||||||
|
{
|
||||||
|
return GenerateBackupNameForSequenceCollision(address);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("unsupported object to construct a rename statement"),
|
ereport(ERROR, (errmsg("unsupported object to construct a rename statement"),
|
||||||
|
@ -243,6 +250,7 @@ CreateRenameTypeStmt(const ObjectAddress *address, char *newName)
|
||||||
address->objectId));
|
address->objectId));
|
||||||
stmt->newname = newName;
|
stmt->newname = newName;
|
||||||
|
|
||||||
|
|
||||||
return stmt;
|
return stmt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,11 +273,43 @@ CreateRenameProcStmt(const ObjectAddress *address, char *newName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateRenameSequenceStmt creates a rename statement for a sequence based on its
|
||||||
|
* ObjectAddress. The rename statement will rename the existing object on its address
|
||||||
|
* to the value provided in newName.
|
||||||
|
*/
|
||||||
|
static RenameStmt *
|
||||||
|
CreateRenameSequenceStmt(const ObjectAddress *address, char *newName)
|
||||||
|
{
|
||||||
|
RenameStmt *stmt = makeNode(RenameStmt);
|
||||||
|
Oid seqOid = address->objectId;
|
||||||
|
|
||||||
|
HeapTuple seqClassTuple = SearchSysCache1(RELOID, seqOid);
|
||||||
|
if (!HeapTupleIsValid(seqClassTuple))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("citus cache lookup error")));
|
||||||
|
}
|
||||||
|
Form_pg_class seqClassForm = (Form_pg_class) GETSTRUCT(seqClassTuple);
|
||||||
|
|
||||||
|
char *schemaName = get_namespace_name(seqClassForm->relnamespace);
|
||||||
|
char *seqName = NameStr(seqClassForm->relname);
|
||||||
|
List *name = list_make2(makeString(schemaName), makeString(seqName));
|
||||||
|
ReleaseSysCache(seqClassTuple);
|
||||||
|
|
||||||
|
stmt->renameType = OBJECT_SEQUENCE;
|
||||||
|
stmt->object = (Node *) name;
|
||||||
|
stmt->relation = makeRangeVar(schemaName, seqName, -1);
|
||||||
|
stmt->newname = newName;
|
||||||
|
|
||||||
|
return stmt;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CreateRenameStatement creates a rename statement for an existing object to rename the
|
* CreateRenameStatement creates a rename statement for an existing object to rename the
|
||||||
* object to newName.
|
* object to newName.
|
||||||
*/
|
*/
|
||||||
static RenameStmt *
|
RenameStmt *
|
||||||
CreateRenameStatement(const ObjectAddress *address, char *newName)
|
CreateRenameStatement(const ObjectAddress *address, char *newName)
|
||||||
{
|
{
|
||||||
switch (getObjectClass(address))
|
switch (getObjectClass(address))
|
||||||
|
@ -289,6 +329,15 @@ CreateRenameStatement(const ObjectAddress *address, char *newName)
|
||||||
return CreateRenameTypeStmt(address, newName);
|
return CreateRenameTypeStmt(address, newName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case OCLASS_CLASS:
|
||||||
|
{
|
||||||
|
char relKind = get_rel_relkind(address->objectId);
|
||||||
|
if (relKind == RELKIND_SEQUENCE)
|
||||||
|
{
|
||||||
|
return CreateRenameSequenceStmt(address, newName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("unsupported object to construct a rename statement"),
|
ereport(ERROR, (errmsg("unsupported object to construct a rename statement"),
|
||||||
|
|
|
@ -32,6 +32,7 @@
|
||||||
#include "distributed/commands/utility_hook.h"
|
#include "distributed/commands/utility_hook.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/coordinator_protocol.h"
|
#include "distributed/coordinator_protocol.h"
|
||||||
|
#include "distributed/deparser.h"
|
||||||
#include "distributed/intermediate_results.h"
|
#include "distributed/intermediate_results.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
|
@ -44,6 +45,7 @@
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
|
|
||||||
|
#include "distributed/worker_create_or_replace.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "executor/spi.h"
|
#include "executor/spi.h"
|
||||||
|
@ -480,13 +482,16 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS)
|
||||||
Form_pg_sequence pgSequenceForm = pg_get_sequencedef(sequenceOid);
|
Form_pg_sequence pgSequenceForm = pg_get_sequencedef(sequenceOid);
|
||||||
if (pgSequenceForm->seqtypid != sequenceTypeId)
|
if (pgSequenceForm->seqtypid != sequenceTypeId)
|
||||||
{
|
{
|
||||||
StringInfo dropSequenceString = makeStringInfo();
|
ObjectAddress sequenceAddress = { 0 };
|
||||||
char *qualifiedSequenceName = quote_qualified_identifier(sequenceSchema,
|
ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid);
|
||||||
sequenceName);
|
|
||||||
appendStringInfoString(dropSequenceString, "DROP SEQUENCE ");
|
char *newName = GenerateBackupNameForCollision(&sequenceAddress);
|
||||||
appendStringInfoString(dropSequenceString, qualifiedSequenceName);
|
|
||||||
appendStringInfoString(dropSequenceString, ";");
|
RenameStmt *renameStmt = CreateRenameStatement(&sequenceAddress, newName);
|
||||||
ExecuteQueryViaSPI(dropSequenceString->data, SPI_OK_UTILITY);
|
const char *sqlRenameStmt = DeparseTreeNode((Node *) renameStmt);
|
||||||
|
ProcessUtilityParseTree((Node *) renameStmt, sqlRenameStmt,
|
||||||
|
PROCESS_UTILITY_QUERY,
|
||||||
|
NULL, None_Receiver, NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -495,8 +500,8 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS)
|
||||||
None_Receiver, NULL);
|
None_Receiver, NULL);
|
||||||
CommandCounterIncrement();
|
CommandCounterIncrement();
|
||||||
|
|
||||||
sequenceRelationId = RangeVarGetRelid(createSequenceStatement->sequence,
|
Oid sequenceRelationId = RangeVarGetRelid(createSequenceStatement->sequence,
|
||||||
AccessShareLock, false);
|
AccessShareLock, false);
|
||||||
Assert(sequenceRelationId != InvalidOid);
|
Assert(sequenceRelationId != InvalidOid);
|
||||||
|
|
||||||
AlterSequenceMinMax(sequenceRelationId, sequenceSchema, sequenceName, sequenceTypeId);
|
AlterSequenceMinMax(sequenceRelationId, sequenceSchema, sequenceName, sequenceTypeId);
|
||||||
|
|
|
@ -397,6 +397,7 @@ extern ObjectAddress AlterSequenceOwnerStmtObjectAddress(Node *node, bool missin
|
||||||
extern ObjectAddress RenameSequenceStmtObjectAddress(Node *node, bool missing_ok);
|
extern ObjectAddress RenameSequenceStmtObjectAddress(Node *node, bool missing_ok);
|
||||||
extern void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt);
|
extern void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt);
|
||||||
extern void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt);
|
extern void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt);
|
||||||
|
extern char * GenerateBackupNameForSequenceCollision(const ObjectAddress *address);
|
||||||
|
|
||||||
/* statistics.c - forward declarations */
|
/* statistics.c - forward declarations */
|
||||||
extern List * PreprocessCreateStatisticsStmt(Node *node, const char *queryString,
|
extern List * PreprocessCreateStatisticsStmt(Node *node, const char *queryString,
|
||||||
|
|
|
@ -14,8 +14,12 @@
|
||||||
#ifndef WORKER_CREATE_OR_REPLACE_H
|
#ifndef WORKER_CREATE_OR_REPLACE_H
|
||||||
#define WORKER_CREATE_OR_REPLACE_H
|
#define WORKER_CREATE_OR_REPLACE_H
|
||||||
|
|
||||||
|
#include "catalog/objectaddress.h"
|
||||||
|
|
||||||
#define CREATE_OR_REPLACE_COMMAND "SELECT worker_create_or_replace_object(%s);"
|
#define CREATE_OR_REPLACE_COMMAND "SELECT worker_create_or_replace_object(%s);"
|
||||||
|
|
||||||
extern char * WrapCreateOrReplace(const char *sql);
|
extern char * WrapCreateOrReplace(const char *sql);
|
||||||
|
extern char * GenerateBackupNameForCollision(const ObjectAddress *address);
|
||||||
|
extern RenameStmt * CreateRenameStatement(const ObjectAddress *address, char *newName);
|
||||||
|
|
||||||
#endif /* WORKER_CREATE_OR_REPLACE_H */
|
#endif /* WORKER_CREATE_OR_REPLACE_H */
|
||||||
|
|
Loading…
Reference in New Issue