mirror of https://github.com/citusdata/citus.git
Fix: rename remote type on conflict (#2983)
DESCRIPTION: Rename remote types during type propagation To prevent data to be destructed when a remote type differs from the type on the coordinator during type propagation we wanted to rename the type instead of `DROP CASCADE`. This patch removes the `DROP` logic and adds the creation of a rename statement to a free name.pull/2987/head
parent
0a3152d09c
commit
2b7f5552c8
|
@ -1150,41 +1150,6 @@ AlterTypeOwnerObjectAddress(AlterOwnerStmt *stmt, bool missing_ok)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateDropStmtBasedOnCompositeTypeStmt returns, given a CREATE TYPE statement, a
|
||||
* corresponding statement to drop the type that is to be created. The type does not need
|
||||
* to exists in this postgres for this function to succeed.
|
||||
*/
|
||||
DropStmt *
|
||||
CreateDropStmtBasedOnCompositeTypeStmt(CompositeTypeStmt *stmt)
|
||||
{
|
||||
List *names = MakeNameListFromRangeVar(stmt->typevar);
|
||||
TypeName *typeName = makeTypeNameFromNameList(names);
|
||||
|
||||
DropStmt *dropStmt = makeNode(DropStmt);
|
||||
dropStmt->removeType = OBJECT_TYPE;
|
||||
dropStmt->objects = list_make1(typeName);
|
||||
return dropStmt;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateDropStmtBasedOnEnumStmt returns, given a CREATE TYPE ... AS ENUM statement, a
|
||||
* corresponding statement to drop the type that is to be created. The type does not need
|
||||
* to exists in this postgres for this function to succeed.
|
||||
*/
|
||||
DropStmt *
|
||||
CreateDropStmtBasedOnEnumStmt(CreateEnumStmt *stmt)
|
||||
{
|
||||
TypeName *typeName = makeTypeNameFromNameList(stmt->typeName);
|
||||
|
||||
DropStmt *dropStmt = makeNode(DropStmt);
|
||||
dropStmt->removeType = OBJECT_TYPE;
|
||||
dropStmt->objects = list_make1(typeName);
|
||||
return dropStmt;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateTypeDDLCommandsIdempotent returns a list of DDL statements (const char *) to be
|
||||
* executed on a node to recreate the type addressed by the typeAddress.
|
||||
|
@ -1229,6 +1194,76 @@ CreateTypeDDLCommandsIdempotent(const ObjectAddress *typeAddress)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* GenerateBackupNameForTypeCollision generates a new type name for an existing type. The
|
||||
* name is generated in such a way that the new name doesn't overlap with an existing type
|
||||
* by adding a postfix with incrementing number after the new name.
|
||||
*/
|
||||
char *
|
||||
GenerateBackupNameForTypeCollision(const ObjectAddress *address)
|
||||
{
|
||||
List *names = stringToQualifiedNameList(format_type_be_qualified(address->objectId));
|
||||
RangeVar *rel = makeRangeVarFromNameList(names);
|
||||
|
||||
char newName[NAMEDATALEN] = { 0 };
|
||||
char postfix[NAMEDATALEN] = { 0 };
|
||||
char *baseName = rel->relname;
|
||||
int count = 0;
|
||||
|
||||
while (true)
|
||||
{
|
||||
int postfixLength = snprintf(postfix, NAMEDATALEN - 1, "(citus_backup_%d)",
|
||||
count);
|
||||
int baseLength = strlen(baseName);
|
||||
TypeName *newTypeName = NULL;
|
||||
Oid typeOid = InvalidOid;
|
||||
|
||||
/* trim the base name at the end to leave space for the postfix and trailing \0 */
|
||||
baseLength = Min(baseLength, NAMEDATALEN - postfixLength - 1);
|
||||
|
||||
/* clear newName before copying the potentially trimmed baseName and postfix */
|
||||
memset(newName, 0, NAMEDATALEN);
|
||||
strncpy(newName, baseName, baseLength);
|
||||
strncpy(newName + baseLength, postfix, postfixLength);
|
||||
|
||||
rel->relname = newName;
|
||||
newTypeName = makeTypeNameFromNameList(MakeNameListFromRangeVar(rel));
|
||||
|
||||
typeOid = LookupTypeNameOid(NULL, newTypeName, true);
|
||||
if (typeOid == InvalidOid)
|
||||
{
|
||||
/*
|
||||
* Typename didn't exist yet.
|
||||
* Need to pstrdup the name as it was stack allocated during calculations.
|
||||
*/
|
||||
return pstrdup(newName);
|
||||
}
|
||||
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateRenameTypeStmt creates a rename statement for a type based on its ObjectAddress.
|
||||
* The rename statement will rename the existing object on its address to the value
|
||||
* provided in newName.
|
||||
*/
|
||||
RenameStmt *
|
||||
CreateRenameTypeStmt(const ObjectAddress *address, char *newName)
|
||||
{
|
||||
RenameStmt *stmt = NULL;
|
||||
|
||||
stmt = makeNode(RenameStmt);
|
||||
stmt->renameType = OBJECT_TYPE;
|
||||
stmt->object = (Node *) stringToQualifiedNameList(format_type_be_qualified(
|
||||
address->objectId));
|
||||
stmt->newname = newName;
|
||||
|
||||
return stmt;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* WrapCreateOrReplace takes a sql CREATE command and wraps it in a call to citus' udf to
|
||||
* create or replace the existing object based on its create command.
|
||||
|
|
|
@ -28,8 +28,8 @@
|
|||
#include "distributed/worker_protocol.h"
|
||||
|
||||
static Node * CreateStmtByObjectAddress(const ObjectAddress *address);
|
||||
static DropStmt * CreateDropStmtBasedOnCreateStmt(Node *createStmt);
|
||||
|
||||
static RenameStmt * CreateRenameStatement(const ObjectAddress *address, char *newName);
|
||||
static char * GenerateBackupNameForCollision(const ObjectAddress *address);
|
||||
|
||||
PG_FUNCTION_INFO_V1(worker_create_or_replace_object);
|
||||
|
||||
|
@ -70,7 +70,9 @@ worker_create_or_replace_object(PG_FUNCTION_ARGS)
|
|||
{
|
||||
Node *localCreateStmt = NULL;
|
||||
const char *localSqlStatement = NULL;
|
||||
DropStmt *dropStmtParseTree = NULL;
|
||||
char *newName = NULL;
|
||||
RenameStmt *renameStmt = NULL;
|
||||
const char *sqlRenameStmt = NULL;
|
||||
|
||||
localCreateStmt = CreateStmtByObjectAddress(address);
|
||||
localSqlStatement = DeparseTreeNode(localCreateStmt);
|
||||
|
@ -92,26 +94,12 @@ worker_create_or_replace_object(PG_FUNCTION_ARGS)
|
|||
PG_RETURN_BOOL(false);
|
||||
}
|
||||
|
||||
/* TODO don't drop, instead rename as described in documentation */
|
||||
newName = GenerateBackupNameForCollision(address);
|
||||
|
||||
/*
|
||||
* there might be dependencies left on the worker on this type, these are not
|
||||
* managed by citus anyway so it should be ok to drop, thus we cascade to any such
|
||||
* dependencies
|
||||
*/
|
||||
dropStmtParseTree = CreateDropStmtBasedOnCreateStmt(parseTree);
|
||||
|
||||
if (dropStmtParseTree != NULL)
|
||||
{
|
||||
const char *sqlDropStmt = NULL;
|
||||
|
||||
/* force the drop */
|
||||
dropStmtParseTree->behavior = DROP_CASCADE;
|
||||
|
||||
sqlDropStmt = DeparseTreeNode((Node *) dropStmtParseTree);
|
||||
CitusProcessUtility((Node *) dropStmtParseTree, sqlDropStmt,
|
||||
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
|
||||
}
|
||||
renameStmt = CreateRenameStatement(address, newName);
|
||||
sqlRenameStmt = DeparseTreeNode((Node *) renameStmt);
|
||||
CitusProcessUtility((Node *) renameStmt, sqlRenameStmt, PROCESS_UTILITY_TOPLEVEL,
|
||||
NULL, None_Receiver, NULL);
|
||||
}
|
||||
|
||||
/* apply create statement locally */
|
||||
|
@ -149,32 +137,49 @@ CreateStmtByObjectAddress(const ObjectAddress *address)
|
|||
}
|
||||
|
||||
|
||||
/* TODO will be removed as we will not drop but rename instead */
|
||||
static DropStmt *
|
||||
CreateDropStmtBasedOnCreateStmt(Node *createStmt)
|
||||
/*
|
||||
* GenerateBackupNameForCollision calculate a backup name for a given object by its
|
||||
* address. This name should be used when renaming an existing object before creating the
|
||||
* new object locally on the worker.
|
||||
*/
|
||||
static char *
|
||||
GenerateBackupNameForCollision(const ObjectAddress *address)
|
||||
{
|
||||
switch (nodeTag(createStmt))
|
||||
switch (getObjectClass(address))
|
||||
{
|
||||
case T_CompositeTypeStmt:
|
||||
case OCLASS_TYPE:
|
||||
{
|
||||
return CreateDropStmtBasedOnCompositeTypeStmt(
|
||||
castNode(CompositeTypeStmt, createStmt));
|
||||
}
|
||||
|
||||
case T_CreateEnumStmt:
|
||||
{
|
||||
return CreateDropStmtBasedOnEnumStmt(castNode(CreateEnumStmt, createStmt));
|
||||
return GenerateBackupNameForTypeCollision(address);
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
/*
|
||||
* should not be reached, indicates the coordinator is sending unsupported
|
||||
* statements
|
||||
*/
|
||||
ereport(ERROR, (errmsg("unsupported statement to transform to a drop stmt"),
|
||||
errhint("The coordinator send an unsupported command to the "
|
||||
"worker")));
|
||||
ereport(ERROR, (errmsg("unsupported object to construct a rename statement"),
|
||||
errdetail(
|
||||
"unable to generate a backup name for the old type")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateRenameStatement creates a rename statement for an existing object to rename the
|
||||
* object to newName.
|
||||
*/
|
||||
static RenameStmt *
|
||||
CreateRenameStatement(const ObjectAddress *address, char *newName)
|
||||
{
|
||||
switch (getObjectClass(address))
|
||||
{
|
||||
case OCLASS_TYPE:
|
||||
{
|
||||
return CreateRenameTypeStmt(address, newName);
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
ereport(ERROR, (errmsg("unsupported object to construct a rename statement"),
|
||||
errdetail("unable to generate a parsetree for the rename")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -155,9 +155,9 @@ extern const ObjectAddress * RenameTypeAttributeStmtObjectAddress(RenameStmt *st
|
|||
bool missing_ok);
|
||||
extern const ObjectAddress * AlterTypeOwnerObjectAddress(AlterOwnerStmt *stmt,
|
||||
bool missing_ok);
|
||||
extern DropStmt * CreateDropStmtBasedOnCompositeTypeStmt(CompositeTypeStmt *stmt);
|
||||
extern DropStmt * CreateDropStmtBasedOnEnumStmt(CreateEnumStmt *stmt);
|
||||
extern List * CreateTypeDDLCommandsIdempotent(const ObjectAddress *typeAddress);
|
||||
extern char * GenerateBackupNameForTypeCollision(const ObjectAddress *address);
|
||||
extern RenameStmt * CreateRenameTypeStmt(const ObjectAddress *address, char *newName);
|
||||
|
||||
/* vacuum.c - froward declarations */
|
||||
extern void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
SET citus.next_shard_id TO 20020000;
|
||||
CREATE SCHEMA type_conflict;
|
||||
SELECT run_command_on_workers($$CREATE SCHEMA type_conflict;$$);
|
||||
run_command_on_workers
|
||||
-------------------------------------
|
||||
(localhost,57637,t,"CREATE SCHEMA")
|
||||
(localhost,57638,t,"CREATE SCHEMA")
|
||||
(2 rows)
|
||||
|
||||
-- create a type on a worker that should not cause data loss once overwritten with a type
|
||||
-- from the coordinator
|
||||
\c - - - :worker_1_port
|
||||
SET citus.enable_ddl_propagation TO off;
|
||||
SET search_path TO type_conflict;
|
||||
CREATE TYPE my_precious_type AS (secret text, should bool);
|
||||
CREATE TABLE local_table (a int, b my_precious_type);
|
||||
INSERT INTO local_table VALUES (42, ('always bring a towel', true)::my_precious_type);
|
||||
\c - - - :master_port
|
||||
SET search_path TO type_conflict;
|
||||
-- overwrite the type on the worker from the coordinator. The type should be over written
|
||||
-- but the data should not have been destroyed
|
||||
CREATE TYPE my_precious_type AS (scatterd_secret text);
|
||||
-- verify the data is retained
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO type_conflict;
|
||||
-- show fields for table
|
||||
SELECT pg_class.relname,
|
||||
attname,
|
||||
atttype.typname
|
||||
FROM pg_attribute
|
||||
JOIN pg_class ON (attrelid = pg_class.oid)
|
||||
JOIN pg_type AS atttype ON (atttypid = atttype.oid)
|
||||
WHERE pg_class.relname = 'local_table'
|
||||
AND attnum > 0
|
||||
ORDER BY attnum;
|
||||
relname | attname | typname
|
||||
-------------+---------+----------------------------------
|
||||
local_table | a | int4
|
||||
local_table | b | my_precious_type(citus_backup_0)
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM local_table;
|
||||
a | b
|
||||
----+----------------------------
|
||||
42 | ("always bring a towel",t)
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
SET search_path TO type_conflict;
|
||||
-- make sure worker_create_or_replace correctly generates new names while types are existing
|
||||
SELECT worker_create_or_replace_object('CREATE TYPE type_conflict.multi_conflicting_type AS (a int, b int);');
|
||||
worker_create_or_replace_object
|
||||
---------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT worker_create_or_replace_object('CREATE TYPE type_conflict.multi_conflicting_type AS (a int, b int, c int);');
|
||||
worker_create_or_replace_object
|
||||
---------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT worker_create_or_replace_object('CREATE TYPE type_conflict.multi_conflicting_type AS (a int, b int, c int, d int);');
|
||||
worker_create_or_replace_object
|
||||
---------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT worker_create_or_replace_object('CREATE TYPE type_conflict.multi_conflicting_type_with_a_really_long_name_that_truncates AS (a int, b int);');
|
||||
worker_create_or_replace_object
|
||||
---------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT worker_create_or_replace_object('CREATE TYPE type_conflict.multi_conflicting_type_with_a_really_long_name_that_truncates AS (a int, b int, c int);');
|
||||
worker_create_or_replace_object
|
||||
---------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT worker_create_or_replace_object('CREATE TYPE type_conflict.multi_conflicting_type_with_a_really_long_name_that_truncates AS (a int, b int, c int, d int);');
|
||||
worker_create_or_replace_object
|
||||
---------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- verify they have been created with their names and attributes
|
||||
SELECT pg_type.typname,
|
||||
string_agg(attname || ' ' || atttype.typname, ', ' ORDER BY attnum) AS fields
|
||||
FROM pg_attribute
|
||||
JOIN pg_class ON (attrelid = pg_class.oid)
|
||||
JOIN pg_type ON (pg_class.reltype = pg_type.oid)
|
||||
JOIN pg_type AS atttype ON (atttypid = atttype.oid)
|
||||
WHERE pg_type.typname LIKE 'multi_conflicting_type%'
|
||||
GROUP BY pg_type.typname;
|
||||
typname | fields
|
||||
-----------------------------------------------------------------+--------------------------------
|
||||
multi_conflicting_type | a int4, b int4, c int4, d int4
|
||||
multi_conflicting_type(citus_backup_0) | a int4, b int4
|
||||
multi_conflicting_type(citus_backup_1) | a int4, b int4, c int4
|
||||
multi_conflicting_type_with_a_really_long_name_(citus_backup_0) | a int4, b int4
|
||||
multi_conflicting_type_with_a_really_long_name_(citus_backup_1) | a int4, b int4, c int4
|
||||
multi_conflicting_type_with_a_really_long_name_that_truncates | a int4, b int4, c int4, d int4
|
||||
(6 rows)
|
||||
|
||||
-- hide cascades
|
||||
SET client_min_messages TO error;
|
||||
DROP SCHEMA type_conflict CASCADE;
|
|
@ -280,5 +280,5 @@ test: ssl_by_default
|
|||
# ---------
|
||||
# object distribution tests
|
||||
# ---------
|
||||
test: distributed_types
|
||||
test: distributed_types distributed_types_conflict
|
||||
test: distributed_functions
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
SET citus.next_shard_id TO 20020000;
|
||||
|
||||
CREATE SCHEMA type_conflict;
|
||||
SELECT run_command_on_workers($$CREATE SCHEMA type_conflict;$$);
|
||||
|
||||
-- create a type on a worker that should not cause data loss once overwritten with a type
|
||||
-- from the coordinator
|
||||
\c - - - :worker_1_port
|
||||
SET citus.enable_ddl_propagation TO off;
|
||||
SET search_path TO type_conflict;
|
||||
CREATE TYPE my_precious_type AS (secret text, should bool);
|
||||
CREATE TABLE local_table (a int, b my_precious_type);
|
||||
INSERT INTO local_table VALUES (42, ('always bring a towel', true)::my_precious_type);
|
||||
\c - - - :master_port
|
||||
SET search_path TO type_conflict;
|
||||
|
||||
-- overwrite the type on the worker from the coordinator. The type should be over written
|
||||
-- but the data should not have been destroyed
|
||||
CREATE TYPE my_precious_type AS (scatterd_secret text);
|
||||
|
||||
-- verify the data is retained
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO type_conflict;
|
||||
-- show fields for table
|
||||
SELECT pg_class.relname,
|
||||
attname,
|
||||
atttype.typname
|
||||
FROM pg_attribute
|
||||
JOIN pg_class ON (attrelid = pg_class.oid)
|
||||
JOIN pg_type AS atttype ON (atttypid = atttype.oid)
|
||||
WHERE pg_class.relname = 'local_table'
|
||||
AND attnum > 0
|
||||
ORDER BY attnum;
|
||||
|
||||
SELECT * FROM local_table;
|
||||
|
||||
\c - - - :master_port
|
||||
SET search_path TO type_conflict;
|
||||
|
||||
-- make sure worker_create_or_replace correctly generates new names while types are existing
|
||||
SELECT worker_create_or_replace_object('CREATE TYPE type_conflict.multi_conflicting_type AS (a int, b int);');
|
||||
SELECT worker_create_or_replace_object('CREATE TYPE type_conflict.multi_conflicting_type AS (a int, b int, c int);');
|
||||
SELECT worker_create_or_replace_object('CREATE TYPE type_conflict.multi_conflicting_type AS (a int, b int, c int, d int);');
|
||||
|
||||
SELECT worker_create_or_replace_object('CREATE TYPE type_conflict.multi_conflicting_type_with_a_really_long_name_that_truncates AS (a int, b int);');
|
||||
SELECT worker_create_or_replace_object('CREATE TYPE type_conflict.multi_conflicting_type_with_a_really_long_name_that_truncates AS (a int, b int, c int);');
|
||||
SELECT worker_create_or_replace_object('CREATE TYPE type_conflict.multi_conflicting_type_with_a_really_long_name_that_truncates AS (a int, b int, c int, d int);');
|
||||
|
||||
-- verify they have been created with their names and attributes
|
||||
SELECT pg_type.typname,
|
||||
string_agg(attname || ' ' || atttype.typname, ', ' ORDER BY attnum) AS fields
|
||||
FROM pg_attribute
|
||||
JOIN pg_class ON (attrelid = pg_class.oid)
|
||||
JOIN pg_type ON (pg_class.reltype = pg_type.oid)
|
||||
JOIN pg_type AS atttype ON (atttypid = atttype.oid)
|
||||
WHERE pg_type.typname LIKE 'multi_conflicting_type%'
|
||||
GROUP BY pg_type.typname;
|
||||
|
||||
-- hide cascades
|
||||
SET client_min_messages TO error;
|
||||
DROP SCHEMA type_conflict CASCADE;
|
Loading…
Reference in New Issue