mirror of https://github.com/citusdata/citus.git
Refactor DropShards before introducing local DROP execution
parent
3c99db40b9
commit
873e9fd604
|
@ -27,6 +27,7 @@
|
|||
#include "commands/dbcommands.h"
|
||||
#include "distributed/commands/utility_hook.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/multi_client_executor.h"
|
||||
|
@ -77,6 +78,12 @@ static List * ShardsMatchingDeleteCriteria(Oid relationId, List *shardList,
|
|||
Node *deleteCriteria);
|
||||
static int DropShards(Oid relationId, char *schemaName, char *relationName,
|
||||
List *deletableShardIntervalList);
|
||||
static void ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement,
|
||||
const char *shardRelationName,
|
||||
const char *dropShardPlacementCommand);
|
||||
static char * CreateDropShardPlacementCommand(const char *schemaName,
|
||||
const char *shardRelationName, char
|
||||
storageType);
|
||||
|
||||
|
||||
/* exports for SQL callable functions */
|
||||
|
@ -326,7 +333,7 @@ master_drop_sequences(PG_FUNCTION_ARGS)
|
|||
|
||||
/*
|
||||
* CheckTableSchemaNameForDrop errors out if the current user does not
|
||||
* have permission to undistribute the given relation, taking into
|
||||
* have permission to un-distribute the given relation, taking into
|
||||
* account that it may be called from the drop trigger. If the table exists,
|
||||
* the function rewrites the given table and schema name.
|
||||
*/
|
||||
|
@ -359,93 +366,76 @@ static int
|
|||
DropShards(Oid relationId, char *schemaName, char *relationName,
|
||||
List *deletableShardIntervalList)
|
||||
{
|
||||
ListCell *shardIntervalCell = NULL;
|
||||
Assert(OidIsValid(relationId));
|
||||
Assert(schemaName != NULL);
|
||||
Assert(relationName != NULL);
|
||||
|
||||
UseCoordinatedTransaction();
|
||||
|
||||
/* At this point we intentionally decided to not use 2PC for reference tables */
|
||||
/*
|
||||
* At this point we intentionally decided to not use 2PC for reference
|
||||
* tables
|
||||
*/
|
||||
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
|
||||
{
|
||||
CoordinatedTransactionUse2PC();
|
||||
}
|
||||
|
||||
foreach(shardIntervalCell, deletableShardIntervalList)
|
||||
ShardInterval *shardInterval = NULL;
|
||||
|
||||
foreach_ptr(shardInterval, deletableShardIntervalList)
|
||||
{
|
||||
ListCell *shardPlacementCell = NULL;
|
||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
char *shardRelationName = pstrdup(relationName);
|
||||
|
||||
Assert(shardInterval->relationId == relationId);
|
||||
|
||||
/* Build shard relation name. */
|
||||
/* build shard relation name */
|
||||
AppendShardIdToName(&shardRelationName, shardId);
|
||||
char *quotedShardName = quote_qualified_identifier(schemaName, shardRelationName);
|
||||
|
||||
List *shardPlacementList = ShardPlacementList(shardId);
|
||||
foreach(shardPlacementCell, shardPlacementList)
|
||||
|
||||
ShardPlacement *shardPlacement = NULL;
|
||||
|
||||
foreach_ptr(shardPlacement, shardPlacementList)
|
||||
{
|
||||
ShardPlacement *shardPlacement =
|
||||
(ShardPlacement *) lfirst(shardPlacementCell);
|
||||
char *workerName = shardPlacement->nodeName;
|
||||
uint32 workerPort = shardPlacement->nodePort;
|
||||
StringInfo workerDropQuery = makeStringInfo();
|
||||
uint32 connectionFlags = FOR_DDL;
|
||||
uint64 shardPlacementId = shardPlacement->placementId;
|
||||
|
||||
char storageType = shardInterval->storageType;
|
||||
if (storageType == SHARD_STORAGE_TABLE)
|
||||
{
|
||||
appendStringInfo(workerDropQuery, DROP_REGULAR_TABLE_COMMAND,
|
||||
quotedShardName);
|
||||
}
|
||||
else if (storageType == SHARD_STORAGE_COLUMNAR ||
|
||||
storageType == SHARD_STORAGE_FOREIGN)
|
||||
{
|
||||
appendStringInfo(workerDropQuery, DROP_FOREIGN_TABLE_COMMAND,
|
||||
quotedShardName);
|
||||
}
|
||||
|
||||
/*
|
||||
* The active DROP SCHEMA/DATABASE ... CASCADE will drop the shard, if we
|
||||
* try to drop it over another connection, we will get into a distributed
|
||||
* deadlock.
|
||||
*/
|
||||
if (shardPlacement->groupId == COORDINATOR_GROUP_ID &&
|
||||
IsCoordinator() &&
|
||||
DropSchemaOrDBInProgress())
|
||||
{
|
||||
DeleteShardPlacementRow(shardPlacement->placementId);
|
||||
continue;
|
||||
/*
|
||||
* The active DROP SCHEMA/DATABASE ... CASCADE will drop the
|
||||
* shard, if we try to drop it over another connection, we will
|
||||
* get into a distributed deadlock.
|
||||
*/
|
||||
}
|
||||
|
||||
MultiConnection *connection = GetPlacementConnection(connectionFlags,
|
||||
shardPlacement,
|
||||
NULL);
|
||||
|
||||
RemoteTransactionBeginIfNecessary(connection);
|
||||
|
||||
if (PQstatus(connection->pgConn) != CONNECTION_OK)
|
||||
else
|
||||
{
|
||||
uint64 placementId = shardPlacement->placementId;
|
||||
char storageType = shardInterval->storageType;
|
||||
|
||||
ereport(WARNING, (errmsg("could not connect to shard \"%s\" on node "
|
||||
"\"%s:%u\"", shardRelationName, workerName,
|
||||
workerPort),
|
||||
errdetail("Marking this shard placement for "
|
||||
"deletion")));
|
||||
const char *dropShardPlacementCommand =
|
||||
CreateDropShardPlacementCommand(schemaName, shardRelationName,
|
||||
storageType);
|
||||
|
||||
UpdateShardPlacementState(placementId, SHARD_STATE_TO_DELETE);
|
||||
|
||||
continue;
|
||||
/*
|
||||
* Try to open a new connection (or use an existing one) to
|
||||
* connect to target node to drop shard placement over that
|
||||
* remote connection
|
||||
*/
|
||||
ExecuteDropShardPlacementCommandRemotely(shardPlacement,
|
||||
shardRelationName,
|
||||
dropShardPlacementCommand);
|
||||
}
|
||||
|
||||
MarkRemoteTransactionCritical(connection);
|
||||
|
||||
ExecuteCriticalRemoteCommand(connection, workerDropQuery->data);
|
||||
|
||||
DeleteShardPlacementRow(shardPlacement->placementId);
|
||||
DeleteShardPlacementRow(shardPlacementId);
|
||||
}
|
||||
|
||||
/*
|
||||
* Now that we deleted all placements of the shard (or their metadata),
|
||||
* delete the shard metadata as well.
|
||||
*/
|
||||
DeleteShardRow(shardId);
|
||||
}
|
||||
|
||||
|
@ -455,6 +445,89 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExecuteDropShardPlacementCommandRemotely executes the given DROP shard command
|
||||
* via remote critical connection.
|
||||
*/
|
||||
static void
|
||||
ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement,
|
||||
const char *shardRelationName,
|
||||
const char *dropShardPlacementCommand)
|
||||
{
|
||||
Assert(shardPlacement != NULL);
|
||||
Assert(shardRelationName != NULL);
|
||||
Assert(dropShardPlacementCommand != NULL);
|
||||
|
||||
uint32 connectionFlags = FOR_DDL;
|
||||
MultiConnection *connection = GetPlacementConnection(connectionFlags,
|
||||
shardPlacement,
|
||||
NULL);
|
||||
|
||||
RemoteTransactionBeginIfNecessary(connection);
|
||||
|
||||
if (PQstatus(connection->pgConn) != CONNECTION_OK)
|
||||
{
|
||||
uint64 placementId = shardPlacement->placementId;
|
||||
|
||||
char *workerName = shardPlacement->nodeName;
|
||||
uint32 workerPort = shardPlacement->nodePort;
|
||||
|
||||
ereport(WARNING, (errmsg("could not connect to shard \"%s\" on node "
|
||||
"\"%s:%u\"", shardRelationName, workerName,
|
||||
workerPort),
|
||||
errdetail("Marking this shard placement for "
|
||||
"deletion")));
|
||||
|
||||
UpdateShardPlacementState(placementId, SHARD_STATE_TO_DELETE);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
MarkRemoteTransactionCritical(connection);
|
||||
|
||||
ExecuteCriticalRemoteCommand(connection, dropShardPlacementCommand);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateDropShardPlacementCommand function builds the DROP command to drop
|
||||
* the given shard relation by qualifying it with schema name according to
|
||||
* shard relation's storage type.
|
||||
*/
|
||||
static char *
|
||||
CreateDropShardPlacementCommand(const char *schemaName, const char *shardRelationName,
|
||||
char storageType)
|
||||
{
|
||||
Assert(schemaName != NULL);
|
||||
Assert(shardRelationName != NULL);
|
||||
|
||||
StringInfo workerDropQuery = makeStringInfo();
|
||||
|
||||
const char *quotedShardName = quote_qualified_identifier(schemaName,
|
||||
shardRelationName);
|
||||
|
||||
/* build workerDropQuery according to shard storage type */
|
||||
if (storageType == SHARD_STORAGE_TABLE)
|
||||
{
|
||||
appendStringInfo(workerDropQuery, DROP_REGULAR_TABLE_COMMAND,
|
||||
quotedShardName);
|
||||
}
|
||||
else if (storageType == SHARD_STORAGE_COLUMNAR ||
|
||||
storageType == SHARD_STORAGE_FOREIGN)
|
||||
{
|
||||
appendStringInfo(workerDropQuery, DROP_FOREIGN_TABLE_COMMAND,
|
||||
quotedShardName);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* no other storage type is expected here */
|
||||
Assert(false);
|
||||
}
|
||||
|
||||
return workerDropQuery->data;
|
||||
}
|
||||
|
||||
|
||||
/* Checks that delete is only on one table. */
|
||||
static void
|
||||
CheckTableCount(Query *deleteQuery)
|
||||
|
|
Loading…
Reference in New Issue