mirror of https://github.com/citusdata/citus.git
Merge pull request #5713 from citusdata/prevent-deadlock-on-collation-create
* When a worker tried to create a collation which had a dependency in the same worker node, it would cause a deadlock, now it throws the correct "not a coordinator" error.pull/5701/head
commit
0ca060a820
|
@ -37,7 +37,7 @@ static char * CreateCollationDDLInternal(Oid collationId, Oid *collowner,
|
||||||
char **quotedCollationName);
|
char **quotedCollationName);
|
||||||
static List * FilterNameListForDistributedCollations(List *objects, bool missing_ok,
|
static List * FilterNameListForDistributedCollations(List *objects, bool missing_ok,
|
||||||
List **addresses);
|
List **addresses);
|
||||||
|
static bool ShouldPropagateDefineCollationStmt(void);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GetCreateCollationDDLInternal returns a CREATE COLLATE sql string for the
|
* GetCreateCollationDDLInternal returns a CREATE COLLATE sql string for the
|
||||||
|
@ -519,6 +519,26 @@ DefineCollationStmtObjectAddress(Node *node, bool missing_ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PreprocessDefineCollationStmt executed before the collation has been
|
||||||
|
* created locally to ensure that if the collation create statement will
|
||||||
|
* be propagated, the node is a coordinator node
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
PreprocessDefineCollationStmt(Node *node, const char *queryString,
|
||||||
|
ProcessUtilityContext processUtilityContext)
|
||||||
|
{
|
||||||
|
Assert(castNode(DefineStmt, node)->kind == OBJECT_COLLATION);
|
||||||
|
|
||||||
|
if (ShouldPropagateDefineCollationStmt())
|
||||||
|
{
|
||||||
|
EnsureCoordinator();
|
||||||
|
}
|
||||||
|
|
||||||
|
return NIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* PostprocessDefineCollationStmt executed after the collation has been
|
* PostprocessDefineCollationStmt executed after the collation has been
|
||||||
* created locally and before we create it on the worker nodes.
|
* created locally and before we create it on the worker nodes.
|
||||||
|
@ -531,16 +551,7 @@ PostprocessDefineCollationStmt(Node *node, const char *queryString)
|
||||||
{
|
{
|
||||||
Assert(castNode(DefineStmt, node)->kind == OBJECT_COLLATION);
|
Assert(castNode(DefineStmt, node)->kind == OBJECT_COLLATION);
|
||||||
|
|
||||||
if (!ShouldPropagate())
|
if (!ShouldPropagateDefineCollationStmt())
|
||||||
{
|
|
||||||
return NIL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If the create collation command is a part of a multi-statement transaction,
|
|
||||||
* do not propagate it
|
|
||||||
*/
|
|
||||||
if (IsMultiStatementTransaction())
|
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -548,13 +559,38 @@ PostprocessDefineCollationStmt(Node *node, const char *queryString)
|
||||||
ObjectAddress collationAddress =
|
ObjectAddress collationAddress =
|
||||||
DefineCollationStmtObjectAddress(node, false);
|
DefineCollationStmtObjectAddress(node, false);
|
||||||
|
|
||||||
if (IsObjectDistributed(&collationAddress))
|
|
||||||
{
|
|
||||||
EnsureCoordinator();
|
|
||||||
}
|
|
||||||
|
|
||||||
EnsureDependenciesExistOnAllNodes(&collationAddress);
|
EnsureDependenciesExistOnAllNodes(&collationAddress);
|
||||||
|
|
||||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, CreateCollationDDLsIdempotent(
|
/* to prevent recursion with mx we disable ddl propagation */
|
||||||
|
List *commands = list_make1(DISABLE_DDL_PROPAGATION);
|
||||||
|
commands = list_concat(commands, CreateCollationDDLsIdempotent(
|
||||||
collationAddress.objectId));
|
collationAddress.objectId));
|
||||||
|
commands = lappend(commands, ENABLE_DDL_PROPAGATION);
|
||||||
|
|
||||||
|
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ShouldPropagateDefineCollationStmt checks if collation define
|
||||||
|
* statement should be propagated. Don't propagate if:
|
||||||
|
* - metadata syncing if off
|
||||||
|
* - statement is part of a multi stmt transaction and the multi shard connection
|
||||||
|
* type is not sequential
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
ShouldPropagateDefineCollationStmt()
|
||||||
|
{
|
||||||
|
if (!ShouldPropagate())
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsMultiStatementTransaction() &&
|
||||||
|
MultiShardConnectionType != SEQUENTIAL_CONNECTION)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -276,7 +276,7 @@ static DistributeObjectOps Collation_AlterOwner = {
|
||||||
static DistributeObjectOps Collation_Define = {
|
static DistributeObjectOps Collation_Define = {
|
||||||
.deparse = NULL,
|
.deparse = NULL,
|
||||||
.qualify = NULL,
|
.qualify = NULL,
|
||||||
.preprocess = NULL,
|
.preprocess = PreprocessDefineCollationStmt,
|
||||||
.postprocess = PostprocessDefineCollationStmt,
|
.postprocess = PostprocessDefineCollationStmt,
|
||||||
.address = DefineCollationStmtObjectAddress,
|
.address = DefineCollationStmtObjectAddress,
|
||||||
.markDistributed = true,
|
.markDistributed = true,
|
||||||
|
|
|
@ -151,6 +151,8 @@ extern ObjectAddress AlterCollationSchemaStmtObjectAddress(Node *stmt,
|
||||||
extern List * PostprocessAlterCollationSchemaStmt(Node *stmt, const char *queryString);
|
extern List * PostprocessAlterCollationSchemaStmt(Node *stmt, const char *queryString);
|
||||||
extern char * GenerateBackupNameForCollationCollision(const ObjectAddress *address);
|
extern char * GenerateBackupNameForCollationCollision(const ObjectAddress *address);
|
||||||
extern ObjectAddress DefineCollationStmtObjectAddress(Node *stmt, bool missing_ok);
|
extern ObjectAddress DefineCollationStmtObjectAddress(Node *stmt, bool missing_ok);
|
||||||
|
extern List * PreprocessDefineCollationStmt(Node *stmt, const char *queryString,
|
||||||
|
ProcessUtilityContext processUtilityContext);
|
||||||
extern List * PostprocessDefineCollationStmt(Node *stmt, const char *queryString);
|
extern List * PostprocessDefineCollationStmt(Node *stmt, const char *queryString);
|
||||||
|
|
||||||
/* database.c - forward declarations */
|
/* database.c - forward declarations */
|
||||||
|
|
|
@ -163,3 +163,19 @@ SELECT run_command_on_workers($$DROP USER collationuser;$$);
|
||||||
(localhost,57638,t,"DROP ROLE")
|
(localhost,57638,t,"DROP ROLE")
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
-- test creating a collation on a worker
|
||||||
|
CREATE COLLATION another_german_phonebook (provider = icu, locale = 'de-u-co-phonebk');
|
||||||
|
ERROR: operation is not allowed on this node
|
||||||
|
HINT: Connect to the coordinator and run it again.
|
||||||
|
-- test if creating a collation on a worker on a local
|
||||||
|
-- schema raises the right error
|
||||||
|
SET citus.enable_ddl_propagation TO off;
|
||||||
|
CREATE SCHEMA collation_creation_on_worker;
|
||||||
|
SET citus.enable_ddl_propagation TO on;
|
||||||
|
CREATE COLLATION collation_creation_on_worker.another_german_phonebook (provider = icu, locale = 'de-u-co-phonebk');
|
||||||
|
ERROR: operation is not allowed on this node
|
||||||
|
HINT: Connect to the coordinator and run it again.
|
||||||
|
SET citus.enable_ddl_propagation TO off;
|
||||||
|
DROP SCHEMA collation_creation_on_worker;
|
||||||
|
SET citus.enable_ddl_propagation TO on;
|
||||||
|
|
|
@ -93,3 +93,19 @@ DROP SCHEMA collation_tests CASCADE;
|
||||||
DROP SCHEMA collation_tests2 CASCADE;
|
DROP SCHEMA collation_tests2 CASCADE;
|
||||||
DROP USER collationuser;
|
DROP USER collationuser;
|
||||||
SELECT run_command_on_workers($$DROP USER collationuser;$$);
|
SELECT run_command_on_workers($$DROP USER collationuser;$$);
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
-- test creating a collation on a worker
|
||||||
|
CREATE COLLATION another_german_phonebook (provider = icu, locale = 'de-u-co-phonebk');
|
||||||
|
|
||||||
|
-- test if creating a collation on a worker on a local
|
||||||
|
-- schema raises the right error
|
||||||
|
SET citus.enable_ddl_propagation TO off;
|
||||||
|
CREATE SCHEMA collation_creation_on_worker;
|
||||||
|
SET citus.enable_ddl_propagation TO on;
|
||||||
|
|
||||||
|
CREATE COLLATION collation_creation_on_worker.another_german_phonebook (provider = icu, locale = 'de-u-co-phonebk');
|
||||||
|
|
||||||
|
SET citus.enable_ddl_propagation TO off;
|
||||||
|
DROP SCHEMA collation_creation_on_worker;
|
||||||
|
SET citus.enable_ddl_propagation TO on;
|
||||||
|
|
Loading…
Reference in New Issue