diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 0ed584ed8..63e743f77 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -24,6 +24,7 @@ #include "access/xact.h" #include "catalog/namespace.h" #include "commands/dbcommands.h" +#include "distributed/connection_management.h" #include "distributed/master_protocol.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_join_order.h" @@ -32,6 +33,7 @@ #include "distributed/multi_server_executor.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" +#include "distributed/remote_commands.h" #include "distributed/relay_utility.h" #include "distributed/worker_protocol.h" #include "lib/stringinfo.h" @@ -190,11 +192,10 @@ master_drop_all_shards(PG_FUNCTION_ARGS) char *schemaName = NULL; char *relationName = NULL; - bool isTopLevel = true; List *shardIntervalList = NIL; int droppedShardCount = 0; - PreventTransactionChain(isTopLevel, "DROP distributed table"); + BeginOrContinueCoordinatedTransaction(); relationName = get_rel_name(relationId); @@ -253,9 +254,11 @@ master_drop_sequences(PG_FUNCTION_ARGS) ArrayIterator sequenceIterator = NULL; Datum sequenceText = 0; bool isNull = false; - + MultiConnection *connection = NULL; StringInfo dropSeqCommand = makeStringInfo(); + BeginOrContinueCoordinatedTransaction(); + /* iterate over sequence names to build single command to DROP them all */ sequenceIterator = array_create_iterator(sequenceNamesArray, 0, NULL); while (array_iterate(sequenceIterator, &sequenceText, &isNull)) @@ -280,7 +283,9 @@ master_drop_sequences(PG_FUNCTION_ARGS) appendStringInfo(dropSeqCommand, " %s", TextDatumGetCString(sequenceText)); } - dropSuccessful = ExecuteRemoteCommand(nodeName, nodePort, dropSeqCommand); + connection = GetNodeConnection(NEW_CONNECTION | CACHED_CONNECTION, + nodeName, nodePort); + dropSuccessful = ExecuteCheckStatement(connection, dropSeqCommand->data); if (!dropSuccessful) { ereport(WARNING, (errmsg("could not delete sequences from node \"%s:" INT64_FORMAT @@ -305,15 +310,15 @@ DropShards(Oid relationId, char *schemaName, char *relationName, { ListCell *shardIntervalCell = NULL; int droppedShardCount = 0; + List *commandList = NIL; + ListCell *commandCell = NULL; + + BeginOrContinueCoordinatedTransaction(); foreach(shardIntervalCell, deletableShardIntervalList) { List *shardPlacementList = NIL; - List *droppedPlacementList = NIL; - List *lingeringPlacementList = NIL; ListCell *shardPlacementCell = NULL; - ListCell *droppedPlacementCell = NULL; - ListCell *lingeringPlacementCell = NULL; ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); uint64 shardId = shardInterval->shardId; char *quotedShardName = NULL; @@ -328,14 +333,11 @@ DropShards(Oid relationId, char *schemaName, char *relationName, shardPlacementList = ShardPlacementList(shardId); foreach(shardPlacementCell, shardPlacementList) { - ShardPlacement *shardPlacement = - (ShardPlacement *) lfirst(shardPlacementCell); - char *workerName = shardPlacement->nodeName; - uint32 workerPort = shardPlacement->nodePort; - bool dropSuccessful = false; + ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); + BatchCommand *command = (BatchCommand *) palloc0(sizeof(BatchCommand)); StringInfo workerDropQuery = makeStringInfo(); - char storageType = shardInterval->storageType; + if (storageType == SHARD_STORAGE_TABLE) { appendStringInfo(workerDropQuery, DROP_REGULAR_TABLE_COMMAND, @@ -348,58 +350,45 @@ DropShards(Oid relationId, char *schemaName, char *relationName, quotedShardName); } - dropSuccessful = ExecuteRemoteCommand(workerName, workerPort, - workerDropQuery); - if (dropSuccessful) - { - droppedPlacementList = lappend(droppedPlacementList, shardPlacement); - } - else - { - lingeringPlacementList = lappend(lingeringPlacementList, shardPlacement); - } + command->placement = placement; + command->connectionFlags = NEW_CONNECTION | CACHED_CONNECTION | FOR_DDL; + command->commandString = workerDropQuery->data; + command->userData = shardRelationName; /* for failure reporting */ + + commandList = lappend(commandList, command); } - /* make sure we don't process cancel signals */ - HOLD_INTERRUPTS(); + DeleteShardRow(shardId); + } - foreach(droppedPlacementCell, droppedPlacementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(droppedPlacementCell); - char *workerName = placement->nodeName; - uint32 workerPort = placement->nodePort; + ExecuteBatchCommands(commandList); - DeleteShardPlacementRow(shardId, workerName, workerPort); - } + foreach(commandCell, commandList) + { + BatchCommand *command = (BatchCommand *) lfirst(commandCell); + ShardPlacement *placement = command->placement; + uint64 shardId = placement->shardId; + uint64 placementId = placement->placementId; + char *workerName = placement->nodeName; + uint32 workerPort = placement->nodePort; + uint64 oldShardLength = placement->shardLength; + const char *shardName = command->userData; /* mark shard placements that we couldn't drop as to be deleted */ - foreach(lingeringPlacementCell, lingeringPlacementList) + if (command->failed) { - ShardPlacement *placement = (ShardPlacement *) lfirst(lingeringPlacementCell); - uint64 placementId = placement->placementId; - char *workerName = placement->nodeName; - uint32 workerPort = placement->nodePort; - uint64 oldShardLength = placement->shardLength; - DeleteShardPlacementRow(shardId, workerName, workerPort); InsertShardPlacementRow(shardId, placementId, FILE_TO_DELETE, oldShardLength, workerName, workerPort); ereport(WARNING, (errmsg("could not delete shard \"%s\" on node \"%s:%u\"", - shardRelationName, workerName, workerPort), + shardName, workerName, workerPort), errdetail("Marking this shard placement for deletion"))); } - - DeleteShardRow(shardId); - - if (QueryCancelPending) + else { - ereport(WARNING, (errmsg("cancel requests are ignored during shard " - "deletion"))); - QueryCancelPending = false; + DeleteShardPlacementRow(shardId, workerName, workerPort); } - - RESUME_INTERRUPTS(); } droppedShardCount = list_length(deletableShardIntervalList); diff --git a/src/test/regress/expected/multi_table_ddl.out b/src/test/regress/expected/multi_table_ddl.out index 7da6ce247..2c4dd6ddd 100644 --- a/src/test/regress/expected/multi_table_ddl.out +++ b/src/test/regress/expected/multi_table_ddl.out @@ -22,15 +22,22 @@ ERROR: cannot execute ALTER TABLE command involving partition column -- verify that the distribution column can't be dropped ALTER TABLE testtableddl DROP COLUMN distributecol; ERROR: cannot execute ALTER TABLE command involving partition column --- verify that the table cannot be dropped in a transaction block -\set VERBOSITY terse +-- verify that the table can be dropped in a transaction block BEGIN; +SELECT 1; + ?column? +---------- + 1 +(1 row) + DROP TABLE testtableddl; -ERROR: DROP distributed table cannot run inside a transaction block -ROLLBACK; -\set VERBOSITY default --- verify that the table can be dropped -DROP TABLE testtableddl; +SELECT 1; + ?column? +---------- + 1 +(1 row) + +COMMIT; -- verify that the table can dropped even if shards exist CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL); SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append'); diff --git a/src/test/regress/expected/multi_truncate.out b/src/test/regress/expected/multi_truncate.out index 09fb65e06..795d5e74b 100644 --- a/src/test/regress/expected/multi_truncate.out +++ b/src/test/regress/expected/multi_truncate.out @@ -68,11 +68,8 @@ SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::r --------- (0 rows) --- command can not be run inside transaction +-- verify command can be run inside transaction BEGIN; TRUNCATE TABLE test_truncate_append; COMMIT; -ERROR: DROP distributed table cannot run inside a transaction block -CONTEXT: SQL statement "SELECT master_drop_all_shards(TG_RELID, TG_TABLE_SCHEMA, TG_TABLE_NAME)" -PL/pgSQL function citus_truncate_trigger() line 13 at PERFORM DROP TABLE test_truncate_append; -- -- truncate for range distribution diff --git a/src/test/regress/sql/multi_table_ddl.sql b/src/test/regress/sql/multi_table_ddl.sql index 8cb2ddbf3..1aaf894d8 100644 --- a/src/test/regress/sql/multi_table_ddl.sql +++ b/src/test/regress/sql/multi_table_ddl.sql @@ -20,15 +20,12 @@ ALTER TABLE testtableddl ALTER COLUMN distributecol TYPE text; -- verify that the distribution column can't be dropped ALTER TABLE testtableddl DROP COLUMN distributecol; --- verify that the table cannot be dropped in a transaction block -\set VERBOSITY terse +-- verify that the table can be dropped in a transaction block BEGIN; +SELECT 1; DROP TABLE testtableddl; -ROLLBACK; -\set VERBOSITY default - --- verify that the table can be dropped -DROP TABLE testtableddl; +SELECT 1; +COMMIT; -- verify that the table can dropped even if shards exist CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL); diff --git a/src/test/regress/sql/multi_truncate.sql b/src/test/regress/sql/multi_truncate.sql index 4abc9f27e..d0fdf3e9d 100644 --- a/src/test/regress/sql/multi_truncate.sql +++ b/src/test/regress/sql/multi_truncate.sql @@ -41,7 +41,7 @@ SELECT count(*) FROM test_truncate_append; -- verify no shard exists anymore SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass; --- command can not be run inside transaction +-- verify command can be run inside transaction BEGIN; TRUNCATE TABLE test_truncate_append; COMMIT; DROP TABLE test_truncate_append; @@ -173,4 +173,4 @@ TRUNCATE TABLE "a b append"; -- verify all shards are dropped SELECT shardid FROM pg_dist_shard where logicalrelid = '"a b append"'::regclass; -DROP TABLE "a b append"; \ No newline at end of file +DROP TABLE "a b append";