mirror of https://github.com/citusdata/citus.git
DROP TABLE becomes idempotent for metadata sync
parent
dc603d16fe
commit
dab1729f94
|
@ -27,6 +27,7 @@
|
||||||
#include "distributed/worker_transaction.h"
|
#include "distributed/worker_transaction.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
#include "storage/lmgr.h"
|
#include "storage/lmgr.h"
|
||||||
|
#include "utils/builtins.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
|
|
||||||
typedef bool (*AddressPredicate)(const ObjectAddress *);
|
typedef bool (*AddressPredicate)(const ObjectAddress *);
|
||||||
|
@ -38,6 +39,8 @@ static List * FilterObjectAddressListByPredicate(List *objectAddressList,
|
||||||
AddressPredicate predicate);
|
AddressPredicate predicate);
|
||||||
static void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target);
|
static void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target);
|
||||||
static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency);
|
static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency);
|
||||||
|
static char * CreateDropTableIfExistsCommand(Oid relationId);
|
||||||
|
static bool IsObjectCitusTable(ObjectAddress *dependency);
|
||||||
static bool ShouldPropagateObject(const ObjectAddress *address);
|
static bool ShouldPropagateObject(const ObjectAddress *address);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -507,6 +510,33 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateDropTableIfExistsCommand gets a relationId and returns
|
||||||
|
* DROP [FOREIGN] TABLE .. CASCADE command for that.
|
||||||
|
*/
|
||||||
|
static char *
|
||||||
|
CreateDropTableIfExistsCommand(Oid relationId)
|
||||||
|
{
|
||||||
|
char *schemaName = get_namespace_name(get_rel_namespace(relationId));
|
||||||
|
char *relationName = get_rel_name(relationId);
|
||||||
|
|
||||||
|
StringInfo workerDropQuery = makeStringInfo();
|
||||||
|
|
||||||
|
const char *quotedRelName = quote_qualified_identifier(schemaName, relationName);
|
||||||
|
if (IsForeignTable(relationId))
|
||||||
|
{
|
||||||
|
appendStringInfo(workerDropQuery, DROP_FOREIGN_TABLE_COMMAND, quotedRelName);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
appendStringInfo(workerDropQuery, DROP_REGULAR_TABLE_COMMAND, quotedRelName);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
return workerDropQuery->data;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GetAllDependencyCreateDDLCommands iteratively calls GetDependencyCreateDDLCommands
|
* GetAllDependencyCreateDDLCommands iteratively calls GetDependencyCreateDDLCommands
|
||||||
* for given dependencies.
|
* for given dependencies.
|
||||||
|
@ -550,6 +580,34 @@ ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort)
|
||||||
dependencies = FilterObjectAddressListByPredicate(dependencies,
|
dependencies = FilterObjectAddressListByPredicate(dependencies,
|
||||||
&SupportedDependencyByCitus);
|
&SupportedDependencyByCitus);
|
||||||
|
|
||||||
|
ObjectAddress *dependency = NULL;
|
||||||
|
foreach_ptr(dependency, dependencies)
|
||||||
|
{
|
||||||
|
if (IsAnyObjectAddressOwnedByExtension(list_make1(dependency), NULL))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* we expect extension-owned objects to be created as a result
|
||||||
|
* of the extension being created.
|
||||||
|
*/
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We should make CREATE table idempotent. Given that there is not
|
||||||
|
* CREATE OR REPLACE table synctax, we should drop first.
|
||||||
|
*
|
||||||
|
* And, we should drop the table(s) before actually getting into
|
||||||
|
* dependency object creation, otherwise sequences might be
|
||||||
|
* dropped wrongly.
|
||||||
|
*/
|
||||||
|
if (IsObjectCitusTable(dependency))
|
||||||
|
{
|
||||||
|
/* let's make table creation idempotent as well */
|
||||||
|
char *dropTableCommand = CreateDropTableIfExistsCommand(dependency->objectId);
|
||||||
|
ddlCommands = lappend(ddlCommands, dropTableCommand);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* When dependency lists are getting longer we see a delay in the creation time on the
|
* When dependency lists are getting longer we see a delay in the creation time on the
|
||||||
* workers. We would like to inform the user. Currently we warn for lists greater than
|
* workers. We would like to inform the user. Currently we warn for lists greater than
|
||||||
|
@ -566,7 +624,7 @@ ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort)
|
||||||
}
|
}
|
||||||
|
|
||||||
dependencies = OrderObjectAddressListInDependencyOrder(dependencies);
|
dependencies = OrderObjectAddressListInDependencyOrder(dependencies);
|
||||||
ObjectAddress *dependency = NULL;
|
dependency = NULL;
|
||||||
foreach_ptr(dependency, dependencies)
|
foreach_ptr(dependency, dependencies)
|
||||||
{
|
{
|
||||||
if (IsAnyObjectAddressOwnedByExtension(list_make1(dependency), NULL))
|
if (IsAnyObjectAddressOwnedByExtension(list_make1(dependency), NULL))
|
||||||
|
@ -588,6 +646,29 @@ ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsObjectCitusTable is a helper function which gets an ObjectAddress and
|
||||||
|
* returns true if the object is a Citus table.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
IsObjectCitusTable(ObjectAddress *dependency)
|
||||||
|
{
|
||||||
|
if (getObjectClass(dependency) == OCLASS_CLASS)
|
||||||
|
{
|
||||||
|
char relKind = get_rel_relkind(dependency->objectId);
|
||||||
|
if (relKind == RELKIND_RELATION || relKind == RELKIND_PARTITIONED_TABLE ||
|
||||||
|
relKind == RELKIND_FOREIGN_TABLE)
|
||||||
|
{
|
||||||
|
Oid relationId = dependency->objectId;
|
||||||
|
|
||||||
|
return IsCitusTable(relationId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ShouldPropagate determines if we should be propagating anything
|
* ShouldPropagate determines if we should be propagating anything
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -105,9 +105,8 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE SCHEMA").kill()');
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT master_activate_node('localhost', :worker_2_proxy_port);
|
SELECT master_activate_node('localhost', :worker_2_proxy_port);
|
||||||
WARNING: connection not open
|
ERROR: server closed the connection unexpectedly
|
||||||
CONTEXT: while executing command on localhost:xxxxx
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
ERROR: failure on connection marked as essential: localhost:xxxxx
|
|
||||||
-- verify node is not activated
|
-- verify node is not activated
|
||||||
SELECT * FROM master_get_active_worker_nodes()
|
SELECT * FROM master_get_active_worker_nodes()
|
||||||
ORDER BY 1, 2;
|
ORDER BY 1, 2;
|
||||||
|
|
Loading…
Reference in New Issue