Move DropShards() and master_drop_sequences() to new framework.

This includes parallelizing DropShards, and always using transactions,
which should greatly increase DROP TABLE performance for tables with a
lot of shards.
pull/775/head
Andres Freund 2016-09-02 16:27:19 -07:00
parent 8d42d18c81
commit 0850204bc5
5 changed files with 60 additions and 70 deletions

View File

@ -24,6 +24,7 @@
#include "access/xact.h" #include "access/xact.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "distributed/connection_management.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
@ -32,6 +33,7 @@
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/remote_commands.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
@ -190,11 +192,10 @@ master_drop_all_shards(PG_FUNCTION_ARGS)
char *schemaName = NULL; char *schemaName = NULL;
char *relationName = NULL; char *relationName = NULL;
bool isTopLevel = true;
List *shardIntervalList = NIL; List *shardIntervalList = NIL;
int droppedShardCount = 0; int droppedShardCount = 0;
PreventTransactionChain(isTopLevel, "DROP distributed table"); BeginOrContinueCoordinatedTransaction();
relationName = get_rel_name(relationId); relationName = get_rel_name(relationId);
@ -253,9 +254,11 @@ master_drop_sequences(PG_FUNCTION_ARGS)
ArrayIterator sequenceIterator = NULL; ArrayIterator sequenceIterator = NULL;
Datum sequenceText = 0; Datum sequenceText = 0;
bool isNull = false; bool isNull = false;
MultiConnection *connection = NULL;
StringInfo dropSeqCommand = makeStringInfo(); StringInfo dropSeqCommand = makeStringInfo();
BeginOrContinueCoordinatedTransaction();
/* iterate over sequence names to build single command to DROP them all */ /* iterate over sequence names to build single command to DROP them all */
sequenceIterator = array_create_iterator(sequenceNamesArray, 0, NULL); sequenceIterator = array_create_iterator(sequenceNamesArray, 0, NULL);
while (array_iterate(sequenceIterator, &sequenceText, &isNull)) while (array_iterate(sequenceIterator, &sequenceText, &isNull))
@ -280,7 +283,9 @@ master_drop_sequences(PG_FUNCTION_ARGS)
appendStringInfo(dropSeqCommand, " %s", TextDatumGetCString(sequenceText)); appendStringInfo(dropSeqCommand, " %s", TextDatumGetCString(sequenceText));
} }
dropSuccessful = ExecuteRemoteCommand(nodeName, nodePort, dropSeqCommand); connection = GetNodeConnection(NEW_CONNECTION | CACHED_CONNECTION,
nodeName, nodePort);
dropSuccessful = ExecuteCheckStatement(connection, dropSeqCommand->data);
if (!dropSuccessful) if (!dropSuccessful)
{ {
ereport(WARNING, (errmsg("could not delete sequences from node \"%s:" INT64_FORMAT ereport(WARNING, (errmsg("could not delete sequences from node \"%s:" INT64_FORMAT
@ -305,15 +310,15 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
{ {
ListCell *shardIntervalCell = NULL; ListCell *shardIntervalCell = NULL;
int droppedShardCount = 0; int droppedShardCount = 0;
List *commandList = NIL;
ListCell *commandCell = NULL;
BeginOrContinueCoordinatedTransaction();
foreach(shardIntervalCell, deletableShardIntervalList) foreach(shardIntervalCell, deletableShardIntervalList)
{ {
List *shardPlacementList = NIL; List *shardPlacementList = NIL;
List *droppedPlacementList = NIL;
List *lingeringPlacementList = NIL;
ListCell *shardPlacementCell = NULL; ListCell *shardPlacementCell = NULL;
ListCell *droppedPlacementCell = NULL;
ListCell *lingeringPlacementCell = NULL;
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId; uint64 shardId = shardInterval->shardId;
char *quotedShardName = NULL; char *quotedShardName = NULL;
@ -328,14 +333,11 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
shardPlacementList = ShardPlacementList(shardId); shardPlacementList = ShardPlacementList(shardId);
foreach(shardPlacementCell, shardPlacementList) foreach(shardPlacementCell, shardPlacementList)
{ {
ShardPlacement *shardPlacement = ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
(ShardPlacement *) lfirst(shardPlacementCell); BatchCommand *command = (BatchCommand *) palloc0(sizeof(BatchCommand));
char *workerName = shardPlacement->nodeName;
uint32 workerPort = shardPlacement->nodePort;
bool dropSuccessful = false;
StringInfo workerDropQuery = makeStringInfo(); StringInfo workerDropQuery = makeStringInfo();
char storageType = shardInterval->storageType; char storageType = shardInterval->storageType;
if (storageType == SHARD_STORAGE_TABLE) if (storageType == SHARD_STORAGE_TABLE)
{ {
appendStringInfo(workerDropQuery, DROP_REGULAR_TABLE_COMMAND, appendStringInfo(workerDropQuery, DROP_REGULAR_TABLE_COMMAND,
@ -348,58 +350,45 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
quotedShardName); quotedShardName);
} }
dropSuccessful = ExecuteRemoteCommand(workerName, workerPort, command->placement = placement;
workerDropQuery); command->connectionFlags = NEW_CONNECTION | CACHED_CONNECTION | FOR_DDL;
if (dropSuccessful) command->commandString = workerDropQuery->data;
{ command->userData = shardRelationName; /* for failure reporting */
droppedPlacementList = lappend(droppedPlacementList, shardPlacement);
} commandList = lappend(commandList, command);
else
{
lingeringPlacementList = lappend(lingeringPlacementList, shardPlacement);
}
} }
/* make sure we don't process cancel signals */ DeleteShardRow(shardId);
HOLD_INTERRUPTS(); }
foreach(droppedPlacementCell, droppedPlacementList) ExecuteBatchCommands(commandList);
{
ShardPlacement *placement = (ShardPlacement *) lfirst(droppedPlacementCell);
char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort;
DeleteShardPlacementRow(shardId, workerName, workerPort); foreach(commandCell, commandList)
} {
BatchCommand *command = (BatchCommand *) lfirst(commandCell);
ShardPlacement *placement = command->placement;
uint64 shardId = placement->shardId;
uint64 placementId = placement->placementId;
char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort;
uint64 oldShardLength = placement->shardLength;
const char *shardName = command->userData;
/* mark shard placements that we couldn't drop as to be deleted */ /* mark shard placements that we couldn't drop as to be deleted */
foreach(lingeringPlacementCell, lingeringPlacementList) if (command->failed)
{ {
ShardPlacement *placement = (ShardPlacement *) lfirst(lingeringPlacementCell);
uint64 placementId = placement->placementId;
char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort;
uint64 oldShardLength = placement->shardLength;
DeleteShardPlacementRow(shardId, workerName, workerPort); DeleteShardPlacementRow(shardId, workerName, workerPort);
InsertShardPlacementRow(shardId, placementId, FILE_TO_DELETE, oldShardLength, InsertShardPlacementRow(shardId, placementId, FILE_TO_DELETE, oldShardLength,
workerName, workerPort); workerName, workerPort);
ereport(WARNING, (errmsg("could not delete shard \"%s\" on node \"%s:%u\"", ereport(WARNING, (errmsg("could not delete shard \"%s\" on node \"%s:%u\"",
shardRelationName, workerName, workerPort), shardName, workerName, workerPort),
errdetail("Marking this shard placement for deletion"))); errdetail("Marking this shard placement for deletion")));
} }
else
DeleteShardRow(shardId);
if (QueryCancelPending)
{ {
ereport(WARNING, (errmsg("cancel requests are ignored during shard " DeleteShardPlacementRow(shardId, workerName, workerPort);
"deletion")));
QueryCancelPending = false;
} }
RESUME_INTERRUPTS();
} }
droppedShardCount = list_length(deletableShardIntervalList); droppedShardCount = list_length(deletableShardIntervalList);

View File

@ -22,15 +22,22 @@ ERROR: cannot execute ALTER TABLE command involving partition column
-- verify that the distribution column can't be dropped -- verify that the distribution column can't be dropped
ALTER TABLE testtableddl DROP COLUMN distributecol; ALTER TABLE testtableddl DROP COLUMN distributecol;
ERROR: cannot execute ALTER TABLE command involving partition column ERROR: cannot execute ALTER TABLE command involving partition column
-- verify that the table cannot be dropped in a transaction block -- verify that the table can be dropped in a transaction block
\set VERBOSITY terse
BEGIN; BEGIN;
SELECT 1;
?column?
----------
1
(1 row)
DROP TABLE testtableddl; DROP TABLE testtableddl;
ERROR: DROP distributed table cannot run inside a transaction block SELECT 1;
ROLLBACK; ?column?
\set VERBOSITY default ----------
-- verify that the table can be dropped 1
DROP TABLE testtableddl; (1 row)
COMMIT;
-- verify that the table can dropped even if shards exist -- verify that the table can dropped even if shards exist
CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL); CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL);
SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append'); SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append');

View File

@ -68,11 +68,8 @@ SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::r
--------- ---------
(0 rows) (0 rows)
-- command can not be run inside transaction -- verify command can be run inside transaction
BEGIN; TRUNCATE TABLE test_truncate_append; COMMIT; BEGIN; TRUNCATE TABLE test_truncate_append; COMMIT;
ERROR: DROP distributed table cannot run inside a transaction block
CONTEXT: SQL statement "SELECT master_drop_all_shards(TG_RELID, TG_TABLE_SCHEMA, TG_TABLE_NAME)"
PL/pgSQL function citus_truncate_trigger() line 13 at PERFORM
DROP TABLE test_truncate_append; DROP TABLE test_truncate_append;
-- --
-- truncate for range distribution -- truncate for range distribution

View File

@ -20,15 +20,12 @@ ALTER TABLE testtableddl ALTER COLUMN distributecol TYPE text;
-- verify that the distribution column can't be dropped -- verify that the distribution column can't be dropped
ALTER TABLE testtableddl DROP COLUMN distributecol; ALTER TABLE testtableddl DROP COLUMN distributecol;
-- verify that the table cannot be dropped in a transaction block -- verify that the table can be dropped in a transaction block
\set VERBOSITY terse
BEGIN; BEGIN;
SELECT 1;
DROP TABLE testtableddl; DROP TABLE testtableddl;
ROLLBACK; SELECT 1;
\set VERBOSITY default COMMIT;
-- verify that the table can be dropped
DROP TABLE testtableddl;
-- verify that the table can dropped even if shards exist -- verify that the table can dropped even if shards exist
CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL); CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL);

View File

@ -41,7 +41,7 @@ SELECT count(*) FROM test_truncate_append;
-- verify no shard exists anymore -- verify no shard exists anymore
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass; SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass;
-- command can not be run inside transaction -- verify command can be run inside transaction
BEGIN; TRUNCATE TABLE test_truncate_append; COMMIT; BEGIN; TRUNCATE TABLE test_truncate_append; COMMIT;
DROP TABLE test_truncate_append; DROP TABLE test_truncate_append;
@ -173,4 +173,4 @@ TRUNCATE TABLE "a b append";
-- verify all shards are dropped -- verify all shards are dropped
SELECT shardid FROM pg_dist_shard where logicalrelid = '"a b append"'::regclass; SELECT shardid FROM pg_dist_shard where logicalrelid = '"a b append"'::regclass;
DROP TABLE "a b append"; DROP TABLE "a b append";