diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 87491a4f5..d36662e7e 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -27,6 +27,7 @@ #include "distributed/worker_transaction.h" #include "miscadmin.h" #include "storage/lmgr.h" +#include "utils/builtins.h" #include "utils/lsyscache.h" typedef bool (*AddressPredicate)(const ObjectAddress *); @@ -38,6 +39,8 @@ static List * FilterObjectAddressListByPredicate(List *objectAddressList, AddressPredicate predicate); static void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target); static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency); +static char * CreateDropTableIfExistsCommand(Oid relationId); +static bool IsObjectCitusTable(ObjectAddress *dependency); static bool ShouldPropagateObject(const ObjectAddress *address); /* @@ -507,6 +510,33 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency) } +/* + * CreateDropTableIfExistsCommand gets a relationId and returns + * DROP [FOREIGN] TABLE .. CASCADE command for that. + */ +static char * +CreateDropTableIfExistsCommand(Oid relationId) +{ + char *schemaName = get_namespace_name(get_rel_namespace(relationId)); + char *relationName = get_rel_name(relationId); + + StringInfo workerDropQuery = makeStringInfo(); + + const char *quotedRelName = quote_qualified_identifier(schemaName, relationName); + if (IsForeignTable(relationId)) + { + appendStringInfo(workerDropQuery, DROP_FOREIGN_TABLE_COMMAND, quotedRelName); + } + else + { + appendStringInfo(workerDropQuery, DROP_REGULAR_TABLE_COMMAND, quotedRelName); + } + + + return workerDropQuery->data; +} + + /* * GetAllDependencyCreateDDLCommands iteratively calls GetDependencyCreateDDLCommands * for given dependencies. @@ -550,6 +580,34 @@ ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort) dependencies = FilterObjectAddressListByPredicate(dependencies, &SupportedDependencyByCitus); + ObjectAddress *dependency = NULL; + foreach_ptr(dependency, dependencies) + { + if (IsAnyObjectAddressOwnedByExtension(list_make1(dependency), NULL)) + { + /* + * we expect extension-owned objects to be created as a result + * of the extension being created. + */ + continue; + } + + /* + * We should make CREATE table idempotent. Given that there is not + * CREATE OR REPLACE table synctax, we should drop first. + * + * And, we should drop the table(s) before actually getting into + * dependency object creation, otherwise sequences might be + * dropped wrongly. + */ + if (IsObjectCitusTable(dependency)) + { + /* let's make table creation idempotent as well */ + char *dropTableCommand = CreateDropTableIfExistsCommand(dependency->objectId); + ddlCommands = lappend(ddlCommands, dropTableCommand); + } + } + /* * When dependency lists are getting longer we see a delay in the creation time on the * workers. We would like to inform the user. Currently we warn for lists greater than @@ -566,7 +624,7 @@ ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort) } dependencies = OrderObjectAddressListInDependencyOrder(dependencies); - ObjectAddress *dependency = NULL; + dependency = NULL; foreach_ptr(dependency, dependencies) { if (IsAnyObjectAddressOwnedByExtension(list_make1(dependency), NULL)) @@ -588,6 +646,29 @@ ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort) } +/* + * IsObjectCitusTable is a helper function which gets an ObjectAddress and + * returns true if the object is a Citus table. + */ +static bool +IsObjectCitusTable(ObjectAddress *dependency) +{ + if (getObjectClass(dependency) == OCLASS_CLASS) + { + char relKind = get_rel_relkind(dependency->objectId); + if (relKind == RELKIND_RELATION || relKind == RELKIND_PARTITIONED_TABLE || + relKind == RELKIND_FOREIGN_TABLE) + { + Oid relationId = dependency->objectId; + + return IsCitusTable(relationId); + } + } + + return false; +} + + /* * ShouldPropagate determines if we should be propagating anything */ diff --git a/src/test/regress/expected/failure_add_disable_node.out b/src/test/regress/expected/failure_add_disable_node.out index d2a389d96..a86c5cde5 100644 --- a/src/test/regress/expected/failure_add_disable_node.out +++ b/src/test/regress/expected/failure_add_disable_node.out @@ -105,9 +105,8 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE SCHEMA").kill()'); (1 row) SELECT master_activate_node('localhost', :worker_2_proxy_port); -WARNING: connection not open +ERROR: server closed the connection unexpectedly CONTEXT: while executing command on localhost:xxxxx -ERROR: failure on connection marked as essential: localhost:xxxxx -- verify node is not activated SELECT * FROM master_get_active_worker_nodes() ORDER BY 1, 2;