From 484cb12cd038e5a3f3580df1efbe92826d78b837 Mon Sep 17 00:00:00 2001 From: Burak Yucesoy Date: Wed, 18 Jan 2017 12:00:56 +0200 Subject: [PATCH 1/3] Add LoadShardPlacement UDF This UDF returns a shard placement from cache given shard id and placement id. At the moment it iterates over all shard placements of given shard by ShardPlacementList and searches given placement id in that list, which is not a good solution performance-wise. However, currently, this function will be used only when there is a failed transaction. If a need arises we can optimize this function in the future. --- .../distributed/utils/metadata_cache.c | 42 +++++++++++++++++++ src/include/distributed/metadata_cache.h | 1 + 2 files changed, 43 insertions(+) diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 21ff34447..9567ab560 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -245,6 +245,48 @@ LoadShardInterval(uint64 shardId) } +/* + * LoadShardPlacement returns the, cached, metadata about a shard placement. + * + * The return value is a copy of the cached ShardPlacement struct and may + * therefore be modified and/or freed. + */ +ShardPlacement * +LoadShardPlacement(uint64 shardId, uint64 placementId) +{ + ShardCacheEntry *shardEntry = NULL; + DistTableCacheEntry *tableEntry = NULL; + + ShardPlacement *placementArray = NULL; + int numberOfPlacements = 0; + + int i = 0; + + shardEntry = LookupShardCacheEntry(shardId); + tableEntry = shardEntry->tableEntry; + + /* the offset better be in a valid range */ + Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength); + + placementArray = tableEntry->arrayOfPlacementArrays[shardEntry->shardIndex]; + numberOfPlacements = tableEntry->arrayOfPlacementArrayLengths[shardEntry->shardIndex]; + + for (i = 0; i < numberOfPlacements; i++) + { + if (placementArray[i].placementId == placementId) + { + ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement); + CopyShardPlacement(&placementArray[i], shardPlacement); + + return shardPlacement; + } + } + + ereport(ERROR, (errmsg("could not find valid entry for shard placement " + UINT64_FORMAT, placementId))); +} + + /* * ShardPlacementList returns the list of placements for the given shard from * the cache. diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 314002acc..abd1861c0 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -59,6 +59,7 @@ typedef struct extern bool IsDistributedTable(Oid relationId); extern List * DistributedTableList(void); extern ShardInterval * LoadShardInterval(uint64 shardId); +extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern int GetLocalGroupId(void); extern List * DistTableOidList(void); From 2489c59c15d69ee92f4b02425c49af5dcdc872cc Mon Sep 17 00:00:00 2001 From: Burak Yucesoy Date: Wed, 18 Jan 2017 12:01:28 +0200 Subject: [PATCH 2/3] In case of failed transactions update shard state only if it is FILE_FINALIZED Before this change, when a transaction failed, we update related placements shard states to FILE_INACTIVE during XACT_EVENT_PRE_COMMIT. However that means if another code block changed shard state to something else (e.g. FILE_TO_DELETE) before XACT_EVENT_PRE_COMMIT we overwrite that. To prevent that problem, in case of failure we started to change shard state, only if its current shard state is FILE_FINALIZED. --- .../distributed/connection/placement_connection.c | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 5dcbab9c3..7df453945 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -837,7 +837,18 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry, if (placementEntry->failed) { - UpdateShardPlacementState(placementEntry->key.placementId, FILE_INACTIVE); + uint64 shardId = shardEntry->key.shardId; + uint64 placementId = placementEntry->key.placementId; + ShardPlacement *shardPlacement = LoadShardPlacement(shardId, placementId); + + /* + * We only set shard state if its current state is FILE_FINALIZED, which + * prevents overwriting shard state if it is already set at somewhere else. + */ + if (shardPlacement->shardState == FILE_FINALIZED) + { + UpdateShardPlacementState(placementEntry->key.placementId, FILE_INACTIVE); + } } } From d80e7849a4b22cdd0fb17e0a380c156fade7e701 Mon Sep 17 00:00:00 2001 From: Burak Yucesoy Date: Wed, 18 Jan 2017 12:02:11 +0200 Subject: [PATCH 3/3] Convert DropShards to use new connection API With this change DropShards function started to use new connection API. DropShards function is used by DROP TABLE, master_drop_all_shards and master_apply_delete_command, therefore all of these functions now support transactional operations. In DropShards function, if we cannot reach a node, we mark shard state of related placements as FILE_TO_DELETE and continue to drop remaining shards; however if any error occurs after establishing the connection, we ROLLBACK whole operation. --- .../master/master_delete_protocol.c | 98 ++- .../multi_remove_node_reference_table.out | 91 ++- .../multi_replicate_reference_table.out | 50 +- src/test/regress/expected/multi_table_ddl.out | 13 +- .../multi_transactional_drop_shards.out | 627 ++++++++++++++++++ src/test/regress/expected/multi_truncate.out | 5 +- .../input/multi_master_delete_protocol.source | 5 +- src/test/regress/multi_schedule | 5 + .../multi_master_delete_protocol.source | 16 +- .../sql/multi_remove_node_reference_table.sql | 38 +- .../sql/multi_replicate_reference_table.sql | 29 +- src/test/regress/sql/multi_table_ddl.sql | 8 +- .../sql/multi_transactional_drop_shards.sql | 369 +++++++++++ src/test/regress/sql/multi_truncate.sql | 2 +- 14 files changed, 1265 insertions(+), 91 deletions(-) create mode 100644 src/test/regress/expected/multi_transactional_drop_shards.out create mode 100644 src/test/regress/sql/multi_transactional_drop_shards.sql diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 89e418ce7..ad793a128 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -16,6 +16,7 @@ #include "postgres.h" #include "c.h" #include "fmgr.h" +#include "libpq-fe.h" #include "miscadmin.h" #include "port.h" @@ -24,6 +25,7 @@ #include "access/xact.h" #include "catalog/namespace.h" #include "commands/dbcommands.h" +#include "distributed/connection_management.h" #include "distributed/master_protocol.h" #include "distributed/metadata_sync.h" #include "distributed/multi_client_executor.h" @@ -34,7 +36,9 @@ #include "distributed/multi_utility.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" +#include "distributed/placement_connection.h" #include "distributed/relay_utility.h" +#include "distributed/remote_commands.h" #include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" #include "lib/stringinfo.h" @@ -106,10 +110,8 @@ master_apply_delete_command(PG_FUNCTION_ARGS) bool dontWait = false; char partitionMethod = 0; bool failOK = false; - bool isTopLevel = true; EnsureSchemaNode(); - PreventTransactionChain(isTopLevel, "master_apply_delete_command"); queryTreeNode = ParseTreeNode(queryString); if (!IsA(queryTreeNode, DeleteStmt)) @@ -200,7 +202,6 @@ master_drop_all_shards(PG_FUNCTION_ARGS) text *schemaNameText = PG_GETARG_TEXT_P(1); text *relationNameText = PG_GETARG_TEXT_P(2); - bool isTopLevel = true; List *shardIntervalList = NIL; int droppedShardCount = 0; @@ -208,7 +209,6 @@ master_drop_all_shards(PG_FUNCTION_ARGS) char *relationName = text_to_cstring(relationNameText); EnsureSchemaNode(); - PreventTransactionChain(isTopLevel, "DROP distributed table"); CheckTableSchemaNameForDrop(relationId, &schemaName, &relationName); @@ -324,14 +324,26 @@ DropShards(Oid relationId, char *schemaName, char *relationName, ListCell *shardIntervalCell = NULL; int droppedShardCount = 0; + if (XactModificationLevel != XACT_MODIFICATION_NONE) + { + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("shard drop operations must not appear in " + "transaction blocks containing other distributed " + "modifications"))); + } + + BeginOrContinueCoordinatedTransaction(); + + /* At this point we intentionally decided to not use 2PC for reference tables */ + if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) + { + CoordinatedTransactionUse2PC(); + } + foreach(shardIntervalCell, deletableShardIntervalList) { List *shardPlacementList = NIL; - List *droppedPlacementList = NIL; - List *lingeringPlacementList = NIL; ListCell *shardPlacementCell = NULL; - ListCell *droppedPlacementCell = NULL; - ListCell *lingeringPlacementCell = NULL; ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); uint64 shardId = shardInterval->shardId; char *quotedShardName = NULL; @@ -350,8 +362,10 @@ DropShards(Oid relationId, char *schemaName, char *relationName, (ShardPlacement *) lfirst(shardPlacementCell); char *workerName = shardPlacement->nodeName; uint32 workerPort = shardPlacement->nodePort; - bool dropSuccessful = false; StringInfo workerDropQuery = makeStringInfo(); + MultiConnection *connection = NULL; + uint32 connectionFlags = FOR_DDL; + char *extensionOwner = CitusExtensionOwnerName(); char storageType = shardInterval->storageType; if (storageType == SHARD_STORAGE_TABLE) @@ -366,58 +380,34 @@ DropShards(Oid relationId, char *schemaName, char *relationName, quotedShardName); } - dropSuccessful = ExecuteRemoteCommand(workerName, workerPort, - workerDropQuery); - if (dropSuccessful) - { - droppedPlacementList = lappend(droppedPlacementList, shardPlacement); - } - else - { - lingeringPlacementList = lappend(lingeringPlacementList, shardPlacement); - } - } + connection = GetNodeUserDatabaseConnection(connectionFlags, workerName, + workerPort, extensionOwner, NULL); - /* make sure we don't process cancel signals */ - HOLD_INTERRUPTS(); + RemoteTransactionBeginIfNecessary(connection); - foreach(droppedPlacementCell, droppedPlacementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(droppedPlacementCell); - char *workerName = placement->nodeName; - uint32 workerPort = placement->nodePort; + if (PQstatus(connection->pgConn) != CONNECTION_OK) + { + uint64 placementId = shardPlacement->placementId; + + ereport(WARNING, (errmsg("could not connect to shard \"%s\" on node " + "\"%s:%u\"", shardRelationName, workerName, + workerPort), + errdetail("Marking this shard placement for " + "deletion"))); + + UpdateShardPlacementState(placementId, FILE_TO_DELETE); + + continue; + } + + MarkRemoteTransactionCritical(connection); + + ExecuteCriticalRemoteCommand(connection, workerDropQuery->data); DeleteShardPlacementRow(shardId, workerName, workerPort); } - /* mark shard placements that we couldn't drop as to be deleted */ - foreach(lingeringPlacementCell, lingeringPlacementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(lingeringPlacementCell); - uint64 placementId = placement->placementId; - char *workerName = placement->nodeName; - uint32 workerPort = placement->nodePort; - uint64 oldShardLength = placement->shardLength; - - DeleteShardPlacementRow(shardId, workerName, workerPort); - InsertShardPlacementRow(shardId, placementId, FILE_TO_DELETE, oldShardLength, - workerName, workerPort); - - ereport(WARNING, (errmsg("could not delete shard \"%s\" on node \"%s:%u\"", - shardRelationName, workerName, workerPort), - errdetail("Marking this shard placement for deletion"))); - } - DeleteShardRow(shardId); - - if (QueryCancelPending) - { - ereport(WARNING, (errmsg("cancel requests are ignored during shard " - "deletion"))); - QueryCancelPending = false; - } - - RESUME_INTERRUPTS(); } droppedShardCount = list_length(deletableShardIntervalList); diff --git a/src/test/regress/expected/multi_remove_node_reference_table.out b/src/test/regress/expected/multi_remove_node_reference_table.out index 17a81a35b..8446905e7 100644 --- a/src/test/regress/expected/multi_remove_node_reference_table.out +++ b/src/test/regress/expected/multi_remove_node_reference_table.out @@ -648,6 +648,35 @@ NOTICE: Replicating reference table "remove_node_reference_table" to all worker (1 row) -- test DROP table after removing a node in a transaction +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1380000 | 1 | 2 | 0 +(1 row) + BEGIN; SELECT master_remove_node('localhost', :worker_2_port); master_remove_node @@ -656,10 +685,44 @@ SELECT master_remove_node('localhost', :worker_2_port); (1 row) DROP TABLE remove_node_reference_table; -ERROR: DROP distributed table cannot run inside a transaction block -CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" -PL/pgSQL function citus_drop_trigger() line 21 at PERFORM -ROLLBACK; +COMMIT; +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * FROM pg_dist_colocation WHERE colocationid = 1380000; + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ +(0 rows) + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +--------------------------------------------- + (1380005,1380005,localhost,57638,default,f) +(1 row) + +-- re-create remove_node_reference_table +CREATE TABLE remove_node_reference_table(column1 int); +SELECT create_reference_table('remove_node_reference_table'); + create_reference_table +------------------------ + +(1 row) + -- test removing a node while there is a reference table at another schema CREATE SCHEMA remove_node_reference_table_schema; CREATE TABLE remove_node_reference_table_schema.table1(column1 int); @@ -686,8 +749,8 @@ ORDER BY shardid; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- - 1380000 | 1 | 0 | localhost | 57638 1380001 | 1 | 0 | localhost | 57638 + 1380002 | 1 | 0 | localhost | 57638 (2 rows) SELECT * @@ -698,7 +761,7 @@ WHERE colocationid IN WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1380000 | 1 | 2 | 0 + 1380001 | 1 | 2 | 0 (1 row) @@ -717,8 +780,8 @@ WHERE nodeport = :worker_2_port; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- - 1380000 | 1 | 0 | localhost | 57638 1380001 | 1 | 0 | localhost | 57638 + 1380002 | 1 | 0 | localhost | 57638 (2 rows) @@ -754,7 +817,7 @@ WHERE colocationid IN WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1380000 | 1 | 1 | 0 + 1380001 | 1 | 1 | 0 (1 row) \c - - - :worker_1_port @@ -783,7 +846,7 @@ NOTICE: Replicating reference table "remove_node_reference_table" to all worker NOTICE: Replicating reference table "table1" to all workers master_add_node --------------------------------------------- - (1380005,1380005,localhost,57638,default,f) + (1380006,1380006,localhost,57638,default,f) (1 row) -- test with master_disable_node @@ -804,8 +867,8 @@ ORDER BY shardid; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- - 1380000 | 1 | 0 | localhost | 57638 1380001 | 1 | 0 | localhost | 57638 + 1380002 | 1 | 0 | localhost | 57638 (2 rows) SELECT * @@ -816,7 +879,7 @@ WHERE colocationid IN WHERE logicalrelid = 'remove_node_reference_table'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1380000 | 1 | 2 | 0 + 1380001 | 1 | 2 | 0 (1 row) \c - - - :worker_1_port @@ -834,8 +897,8 @@ WHERE nodeport = :worker_2_port; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- - 1380000 | 1 | 0 | localhost | 57638 1380001 | 1 | 0 | localhost | 57638 + 1380002 | 1 | 0 | localhost | 57638 (2 rows) @@ -872,7 +935,7 @@ WHERE colocationid IN WHERE logicalrelid = 'remove_node_reference_table'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1380000 | 1 | 1 | 0 + 1380001 | 1 | 1 | 0 (1 row) \c - - - :worker_1_port @@ -900,7 +963,7 @@ NOTICE: Replicating reference table "remove_node_reference_table" to all worker NOTICE: Replicating reference table "table1" to all workers master_add_node --------------------------------------------- - (1380006,1380006,localhost,57638,default,f) + (1380007,1380007,localhost,57638,default,f) (1 row) -- DROP tables to clean workspace diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index 869a3f905..615eb09ad 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -526,6 +526,28 @@ SELECT create_reference_table('replicate_reference_table_drop'); (1 row) +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_drop'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1370009 | 1 | 1 | 0 +(1 row) + BEGIN; SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "replicate_reference_table_drop" to all workers @@ -535,12 +557,30 @@ NOTICE: Replicating reference table "replicate_reference_table_drop" to all wor (1 row) DROP TABLE replicate_reference_table_drop; -ERROR: DROP distributed table cannot run inside a transaction block -CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" -PL/pgSQL function citus_drop_trigger() line 21 at PERFORM -ROLLBACK; -DROP TABLE replicate_reference_table_drop; +COMMIT; +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + +SELECT * FROM pg_dist_colocation WHERE colocationid = 1370009; + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ +(0 rows) + -- test adding a node while there is a reference table at another schema +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + CREATE SCHEMA replicate_reference_table_schema; CREATE TABLE replicate_reference_table_schema.table1(column1 int); SELECT create_reference_table('replicate_reference_table_schema.table1'); diff --git a/src/test/regress/expected/multi_table_ddl.out b/src/test/regress/expected/multi_table_ddl.out index 9e230ba0b..508ac769b 100644 --- a/src/test/regress/expected/multi_table_ddl.out +++ b/src/test/regress/expected/multi_table_ddl.out @@ -22,13 +22,20 @@ ERROR: cannot execute ALTER TABLE command involving partition column -- verify that the distribution column can't be dropped ALTER TABLE testtableddl DROP COLUMN distributecol; ERROR: cannot execute ALTER TABLE command involving partition column --- verify that the table cannot be dropped in a transaction block +-- verify that the table can be dropped in a transaction block \set VERBOSITY terse BEGIN; DROP TABLE testtableddl; -ERROR: DROP distributed table cannot run inside a transaction block -ROLLBACK; +COMMIT; \set VERBOSITY default +-- recreate testtableddl +CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL); +SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + -- verify that the table can be dropped DROP TABLE testtableddl; -- verify that the table can dropped even if shards exist diff --git a/src/test/regress/expected/multi_transactional_drop_shards.out b/src/test/regress/expected/multi_transactional_drop_shards.out new file mode 100644 index 000000000..88bf6a0ba --- /dev/null +++ b/src/test/regress/expected/multi_transactional_drop_shards.out @@ -0,0 +1,627 @@ +-- +-- MULTI_TRANSACTIONAL_DROP_SHARDS +-- +-- Tests that check the metadata returned by the master node. +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1410000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1410000; +SET citus.shard_count TO 4; +-- test DROP TABLE(ergo master_drop_all_shards) in transaction, then ROLLBACK +CREATE TABLE transactional_drop_shards(column1 int); +SELECT create_distributed_table('transactional_drop_shards', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +BEGIN; +DROP TABLE transactional_drop_shards; +ROLLBACK; +-- verify metadata is not deleted +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid; + shardid +--------- + 1410000 + 1410001 + 1410002 + 1410003 +(4 rows) + +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid) +ORDER BY + shardid; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 1410000 | 1 | localhost | 57637 + 1410000 | 1 | localhost | 57638 + 1410001 | 1 | localhost | 57638 + 1410001 | 1 | localhost | 57637 + 1410002 | 1 | localhost | 57637 + 1410002 | 1 | localhost | 57638 + 1410003 | 1 | localhost | 57638 + 1410003 | 1 | localhost | 57637 +(8 rows) + +-- verify table is not dropped +\d transactional_drop_shards; +Table "public.transactional_drop_shards" + Column | Type | Modifiers +---------+---------+----------- + column1 | integer | + +-- verify shards are not dropped +\c - - - :worker_1_port +\d transactional_drop_shards_*; +Table "public.transactional_drop_shards_1410000" + Column | Type | Modifiers +---------+---------+----------- + column1 | integer | + +Table "public.transactional_drop_shards_1410001" + Column | Type | Modifiers +---------+---------+----------- + column1 | integer | + +Table "public.transactional_drop_shards_1410002" + Column | Type | Modifiers +---------+---------+----------- + column1 | integer | + +Table "public.transactional_drop_shards_1410003" + Column | Type | Modifiers +---------+---------+----------- + column1 | integer | + +\c - - - :master_port +-- test DROP TABLE(ergo master_drop_all_shards) in transaction, then COMMIT +BEGIN; +DROP TABLE transactional_drop_shards; +COMMIT; +-- verify metadata is deleted +SELECT shardid FROM pg_dist_shard WHERE shardid IN (1410000, 1410001, 1410002, 1410003) ORDER BY shardid; + shardid +--------- +(0 rows) + +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (1410000, 1410001, 1410002, 1410003) +ORDER BY + shardid; + shardid | shardstate | nodename | nodeport +---------+------------+----------+---------- +(0 rows) + +-- verify table is dropped +\d transactional_drop_shards; +-- verify shards are dropped +\c - - - :worker_1_port +\d transactional_drop_shards_*; +\c - - - :master_port +-- test master_delete_protocol in transaction, then ROLLBACK +CREATE TABLE transactional_drop_shards(column1 int); +SELECT create_distributed_table('transactional_drop_shards', 'column1', 'append'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT master_create_empty_shard('transactional_drop_shards'); + master_create_empty_shard +--------------------------- + 1410004 +(1 row) + +BEGIN; +SELECT master_apply_delete_command('DELETE FROM transactional_drop_shards'); + master_apply_delete_command +----------------------------- + 1 +(1 row) + +ROLLBACK; +-- verify metadata is not deleted +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid; + shardid +--------- + 1410004 +(1 row) + +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid) +ORDER BY + shardid; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 1410004 | 1 | localhost | 57638 + 1410004 | 1 | localhost | 57637 +(2 rows) + +-- verify shards are not dropped +\c - - - :worker_1_port +\d transactional_drop_shards_*; +Table "public.transactional_drop_shards_1410004" + Column | Type | Modifiers +---------+---------+----------- + column1 | integer | + +\c - - - :master_port +-- test master_delete_protocol in transaction, then COMMIT +BEGIN; +SELECT master_apply_delete_command('DELETE FROM transactional_drop_shards'); + master_apply_delete_command +----------------------------- + 1 +(1 row) + +COMMIT; +-- verify metadata is deleted +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid; + shardid +--------- +(0 rows) + +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid) +ORDER BY + shardid; + shardid | shardstate | nodename | nodeport +---------+------------+----------+---------- +(0 rows) + +-- verify shards are dropped +\c - - - :worker_1_port +\d transactional_drop_shards_*; +\c - - - :master_port +-- test DROP table in a transaction after insertion +SELECT master_create_empty_shard('transactional_drop_shards'); + master_create_empty_shard +--------------------------- + 1410005 +(1 row) + +BEGIN; +INSERT INTO transactional_drop_shards VALUES (1); +DROP TABLE transactional_drop_shards; +ERROR: shard drop operations must not appear in transaction blocks containing other distributed modifications +CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" +PL/pgSQL function citus_drop_trigger() line 21 at PERFORM +ROLLBACK; +-- verify metadata is not deleted +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid; + shardid +--------- + 1410005 +(1 row) + +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid) +ORDER BY + shardid; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 1410005 | 1 | localhost | 57637 + 1410005 | 1 | localhost | 57638 +(2 rows) + +-- verify table is not dropped +\d transactional_drop_shards; +Table "public.transactional_drop_shards" + Column | Type | Modifiers +---------+---------+----------- + column1 | integer | + +-- verify shards are not dropped +\c - - - :worker_1_port +\d transactional_drop_shards_*; +Table "public.transactional_drop_shards_1410005" + Column | Type | Modifiers +---------+---------+----------- + column1 | integer | + +\c - - - :master_port +-- test master_apply_delete_command in a transaction after insertion +BEGIN; +INSERT INTO transactional_drop_shards VALUES (1); +SELECT master_apply_delete_command('DELETE FROM transactional_drop_shards'); +ERROR: shard drop operations must not appear in transaction blocks containing other distributed modifications +ROLLBACK; +-- verify metadata is not deleted +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid; + shardid +--------- + 1410005 +(1 row) + +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid) +ORDER BY + shardid; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 1410005 | 1 | localhost | 57637 + 1410005 | 1 | localhost | 57638 +(2 rows) + +-- verify shards are not dropped +\c - - - :worker_1_port +\d transactional_drop_shards_*; +Table "public.transactional_drop_shards_1410005" + Column | Type | Modifiers +---------+---------+----------- + column1 | integer | + +-- test DROP table with failing worker +CREATE FUNCTION fail_drop_table() RETURNS event_trigger AS $fdt$ + BEGIN + RAISE 'illegal value'; + END; +$fdt$ LANGUAGE plpgsql; +CREATE EVENT TRIGGER fail_drop_table ON sql_drop EXECUTE PROCEDURE fail_drop_table(); +\c - - - :master_port +\set VERBOSITY terse +DROP TABLE transactional_drop_shards; +ERROR: illegal value +\set VERBOSITY default +-- verify metadata is not deleted +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid; + shardid +--------- + 1410005 +(1 row) + +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid) +ORDER BY + shardid; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 1410005 | 1 | localhost | 57637 + 1410005 | 1 | localhost | 57638 +(2 rows) + +-- verify table is not dropped +\d transactional_drop_shards; +Table "public.transactional_drop_shards" + Column | Type | Modifiers +---------+---------+----------- + column1 | integer | + +-- verify shards are not dropped +\c - - - :worker_1_port +\d transactional_drop_shards_*; +Table "public.transactional_drop_shards_1410005" + Column | Type | Modifiers +---------+---------+----------- + column1 | integer | + +\c - - - :master_port +-- test DROP reference table with failing worker +CREATE TABLE transactional_drop_reference(column1 int); +SELECT create_reference_table('transactional_drop_reference'); + create_reference_table +------------------------ + +(1 row) + +\set VERBOSITY terse +DROP TABLE transactional_drop_reference; +ERROR: illegal value +\set VERBOSITY default +-- verify metadata is not deleted +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_reference'::regclass ORDER BY shardid; + shardid +--------- + 1410006 +(1 row) + +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_reference'::regclass ORDER BY shardid) +ORDER BY + shardid, nodename, nodeport; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 1410006 | 1 | localhost | 57637 + 1410006 | 1 | localhost | 57638 +(2 rows) + +-- verify table is not dropped +\d transactional_drop_reference; +Table "public.transactional_drop_reference" + Column | Type | Modifiers +---------+---------+----------- + column1 | integer | + +-- verify shards are not dropped +\c - - - :worker_1_port +\d transactional_drop_reference*; +Table "public.transactional_drop_reference_1410006" + Column | Type | Modifiers +---------+---------+----------- + column1 | integer | + +\c - - - :master_port +-- test master_apply_delete_command table with failing worker +\set VERBOSITY terse +SELECT master_apply_delete_command('DELETE FROM transactional_drop_shards'); +ERROR: illegal value +\set VERBOSITY default +-- verify metadata is not deleted +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid; + shardid +--------- + 1410005 +(1 row) + +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid) +ORDER BY + shardid; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 1410005 | 1 | localhost | 57637 + 1410005 | 1 | localhost | 57638 +(2 rows) + +-- verify shards are not dropped +\c - - - :worker_1_port +\d transactional_drop_shards_*; +Table "public.transactional_drop_shards_1410005" + Column | Type | Modifiers +---------+---------+----------- + column1 | integer | + +DROP EVENT TRIGGER fail_drop_table; +\c - - - :master_port +-- test with SERIAL column + with more shards +SET citus.shard_count TO 8; +CREATE TABLE transactional_drop_serial(column1 int, column2 SERIAL); +SELECT create_distributed_table('transactional_drop_serial', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +-- test DROP TABLE(ergo master_drop_all_shards) in transaction, then ROLLBACK +BEGIN; +DROP TABLE transactional_drop_serial; +ROLLBACK; +-- verify metadata is not deleted +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_serial'::regclass ORDER BY shardid; + shardid +--------- + 1410007 + 1410008 + 1410009 + 1410010 + 1410011 + 1410012 + 1410013 + 1410014 +(8 rows) + +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_serial'::regclass ORDER BY shardid) +ORDER BY + shardid, nodename, nodeport; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 1410007 | 1 | localhost | 57637 + 1410007 | 1 | localhost | 57638 + 1410008 | 1 | localhost | 57637 + 1410008 | 1 | localhost | 57638 + 1410009 | 1 | localhost | 57637 + 1410009 | 1 | localhost | 57638 + 1410010 | 1 | localhost | 57637 + 1410010 | 1 | localhost | 57638 + 1410011 | 1 | localhost | 57637 + 1410011 | 1 | localhost | 57638 + 1410012 | 1 | localhost | 57637 + 1410012 | 1 | localhost | 57638 + 1410013 | 1 | localhost | 57637 + 1410013 | 1 | localhost | 57638 + 1410014 | 1 | localhost | 57637 + 1410014 | 1 | localhost | 57638 +(16 rows) + +-- verify table is not dropped +\d transactional_drop_serial; + Table "public.transactional_drop_serial" + Column | Type | Modifiers +---------+---------+----------------------------------------------------------------------------- + column1 | integer | + column2 | integer | not null default nextval('transactional_drop_serial_column2_seq'::regclass) + +-- verify shards and sequence are not dropped +\c - - - :worker_1_port +\d transactional_drop_serial_1410006; +\ds transactional_drop_serial_column2_seq + List of relations + Schema | Name | Type | Owner +--------+---------------------------------------+----------+---------- + public | transactional_drop_serial_column2_seq | sequence | postgres +(1 row) + +\c - - - :master_port +-- test DROP TABLE(ergo master_drop_all_shards) in transaction, then COMMIT +BEGIN; +DROP TABLE transactional_drop_serial; +COMMIT; +-- verify metadata is deleted +SELECT shardid FROM pg_dist_shard WHERE shardid IN (1410007, 1410008, 1410009, 1410010, 1410011, 1410012, 1410013, 1410014) ORDER BY shardid; + shardid +--------- +(0 rows) + +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (1410007, 1410008, 1410009, 1410010, 1410011, 1410012, 1410013, 1410014) +ORDER BY + shardid, nodename, nodeport; + shardid | shardstate | nodename | nodeport +---------+------------+----------+---------- +(0 rows) + +-- verify table is dropped +\d transactional_drop_serial; +-- verify shards and sequence are dropped +\c - - - :worker_1_port +\d transactional_drop_serial_1410006; +\ds transactional_drop_serial_column2_seq + List of relations + Schema | Name | Type | Owner +--------+------+------+------- +(0 rows) + +\c - - - :master_port +-- test with MX, DROP TABLE, then ROLLBACK +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 4; +CREATE TABLE transactional_drop_mx(column1 int); +SELECT create_distributed_table('transactional_drop_mx', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='transactional_drop_mx'::regclass; +-- make worker 1 receive metadata changes +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +-- see metadata is propogated to the worker +\c - - - :worker_1_port +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_mx'::regclass ORDER BY shardid; + shardid +--------- + 1410015 + 1410016 + 1410017 + 1410018 +(4 rows) + +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_mx'::regclass ORDER BY shardid) +ORDER BY + shardid, nodename, nodeport; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 1410015 | 1 | localhost | 57637 + 1410016 | 1 | localhost | 57638 + 1410017 | 1 | localhost | 57637 + 1410018 | 1 | localhost | 57638 +(4 rows) + +\c - - - :master_port +BEGIN; +DROP TABLE transactional_drop_mx; +ROLLBACK; +-- verify metadata is not deleted +\c - - - :worker_1_port +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_mx'::regclass ORDER BY shardid; + shardid +--------- + 1410015 + 1410016 + 1410017 + 1410018 +(4 rows) + +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_mx'::regclass ORDER BY shardid) +ORDER BY + shardid, nodename, nodeport; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 1410015 | 1 | localhost | 57637 + 1410016 | 1 | localhost | 57638 + 1410017 | 1 | localhost | 57637 + 1410018 | 1 | localhost | 57638 +(4 rows) + +-- test with MX, DROP TABLE, then COMMIT +\c - - - :master_port +BEGIN; +DROP TABLE transactional_drop_mx; +COMMIT; +-- verify metadata is deleted +\c - - - :worker_1_port +SELECT shardid FROM pg_dist_shard WHERE shardid IN (1410015, 1410016, 1410017, 1410018) ORDER BY shardid; + shardid +--------- +(0 rows) + +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (1410015, 1410016, 1410017, 1410018) +ORDER BY + shardid, nodename, nodeport; + shardid | shardstate | nodename | nodeport +---------+------------+----------+---------- +(0 rows) + +\c - - - :master_port +-- clean the workspace +DROP TABLE transactional_drop_shards, transactional_drop_reference; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + diff --git a/src/test/regress/expected/multi_truncate.out b/src/test/regress/expected/multi_truncate.out index 7f7349d4f..bb7d14d0d 100644 --- a/src/test/regress/expected/multi_truncate.out +++ b/src/test/regress/expected/multi_truncate.out @@ -68,11 +68,8 @@ SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::r --------- (0 rows) --- command can not be run inside transaction +-- command can run inside transaction BEGIN; TRUNCATE TABLE test_truncate_append; COMMIT; -ERROR: DROP distributed table cannot run inside a transaction block -CONTEXT: SQL statement "SELECT master_drop_all_shards(TG_RELID, TG_TABLE_SCHEMA, TG_TABLE_NAME)" -PL/pgSQL function citus_truncate_trigger() line 13 at PERFORM DROP TABLE test_truncate_append; -- -- truncate for range distribution diff --git a/src/test/regress/input/multi_master_delete_protocol.source b/src/test/regress/input/multi_master_delete_protocol.source index 06fb1cdf3..a3cc63eb3 100644 --- a/src/test/regress/input/multi_master_delete_protocol.source +++ b/src/test/regress/input/multi_master_delete_protocol.source @@ -52,7 +52,8 @@ SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol WHERE c_custkey > 1000'); SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol'); --- Verify that master_apply_delete_command cannot be called in a transaction block +-- Verify that master_apply_delete_command can be called in a transaction block +SELECT 1 AS one FROM master_create_empty_shard('customer_delete_protocol'); BEGIN; SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol'); -ROLLBACK; +COMMIT; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index f4ad6b01b..39593a977 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -215,3 +215,8 @@ test: multi_foreign_key test: multi_upgrade_reference_table test: multi_replicate_reference_table test: multi_remove_node_reference_table + +# ---------- +# multi_transactional_drop_shards tests for dropping shards using connection API +# ---------- +test: multi_transactional_drop_shards diff --git a/src/test/regress/output/multi_master_delete_protocol.source b/src/test/regress/output/multi_master_delete_protocol.source index 10eafe8ad..7ccee403b 100644 --- a/src/test/regress/output/multi_master_delete_protocol.source +++ b/src/test/regress/output/multi_master_delete_protocol.source @@ -94,8 +94,18 @@ SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol'); 1 (1 row) --- Verify that master_apply_delete_command cannot be called in a transaction block +-- Verify that master_apply_delete_command can be called in a transaction block +SELECT 1 AS one FROM master_create_empty_shard('customer_delete_protocol'); + one +----- + 1 +(1 row) + BEGIN; SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol'); -ERROR: master_apply_delete_command cannot run inside a transaction block -ROLLBACK; + master_apply_delete_command +----------------------------- + 1 +(1 row) + +COMMIT; diff --git a/src/test/regress/sql/multi_remove_node_reference_table.sql b/src/test/regress/sql/multi_remove_node_reference_table.sql index 37d9f8aa3..9c9769bde 100644 --- a/src/test/regress/sql/multi_remove_node_reference_table.sql +++ b/src/test/regress/sql/multi_remove_node_reference_table.sql @@ -392,11 +392,47 @@ SELECT master_add_node('localhost', :worker_2_port); -- test DROP table after removing a node in a transaction + +-- status before master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'remove_node_reference_table'::regclass); + BEGIN; SELECT master_remove_node('localhost', :worker_2_port); DROP TABLE remove_node_reference_table; -ROLLBACK; +COMMIT; +-- status after master_remove_node +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * FROM pg_dist_colocation WHERE colocationid = 1380000; + +-- re-add the node for next tests +SELECT master_add_node('localhost', :worker_2_port); + +-- re-create remove_node_reference_table +CREATE TABLE remove_node_reference_table(column1 int); +SELECT create_reference_table('remove_node_reference_table'); -- test removing a node while there is a reference table at another schema CREATE SCHEMA remove_node_reference_table_schema; diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index 86c5c8ea0..7f9813b77 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -345,14 +345,39 @@ DROP TABLE replicate_reference_table_ddl; CREATE TABLE replicate_reference_table_drop(column1 int); SELECT create_reference_table('replicate_reference_table_drop'); +-- status before master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'replicate_reference_table_drop'::regclass); + BEGIN; SELECT master_add_node('localhost', :worker_2_port); DROP TABLE replicate_reference_table_drop; -ROLLBACK; +COMMIT; -DROP TABLE replicate_reference_table_drop; +-- status after master_add_node +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * FROM pg_dist_colocation WHERE colocationid = 1370009; -- test adding a node while there is a reference table at another schema +SELECT master_remove_node('localhost', :worker_2_port); + CREATE SCHEMA replicate_reference_table_schema; CREATE TABLE replicate_reference_table_schema.table1(column1 int); SELECT create_reference_table('replicate_reference_table_schema.table1'); diff --git a/src/test/regress/sql/multi_table_ddl.sql b/src/test/regress/sql/multi_table_ddl.sql index 166d7cdbd..e1f737e83 100644 --- a/src/test/regress/sql/multi_table_ddl.sql +++ b/src/test/regress/sql/multi_table_ddl.sql @@ -20,13 +20,17 @@ ALTER TABLE testtableddl ALTER COLUMN distributecol TYPE text; -- verify that the distribution column can't be dropped ALTER TABLE testtableddl DROP COLUMN distributecol; --- verify that the table cannot be dropped in a transaction block +-- verify that the table can be dropped in a transaction block \set VERBOSITY terse BEGIN; DROP TABLE testtableddl; -ROLLBACK; +COMMIT; \set VERBOSITY default +-- recreate testtableddl +CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL); +SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append'); + -- verify that the table can be dropped DROP TABLE testtableddl; diff --git a/src/test/regress/sql/multi_transactional_drop_shards.sql b/src/test/regress/sql/multi_transactional_drop_shards.sql new file mode 100644 index 000000000..37a6924a1 --- /dev/null +++ b/src/test/regress/sql/multi_transactional_drop_shards.sql @@ -0,0 +1,369 @@ +-- +-- MULTI_TRANSACTIONAL_DROP_SHARDS +-- +-- Tests that check the metadata returned by the master node. + + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1410000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1410000; + +SET citus.shard_count TO 4; + +-- test DROP TABLE(ergo master_drop_all_shards) in transaction, then ROLLBACK +CREATE TABLE transactional_drop_shards(column1 int); +SELECT create_distributed_table('transactional_drop_shards', 'column1'); + +BEGIN; +DROP TABLE transactional_drop_shards; +ROLLBACK; + +-- verify metadata is not deleted +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid; +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid) +ORDER BY + shardid; + +-- verify table is not dropped +\d transactional_drop_shards; + +-- verify shards are not dropped +\c - - - :worker_1_port +\d transactional_drop_shards_*; +\c - - - :master_port + + +-- test DROP TABLE(ergo master_drop_all_shards) in transaction, then COMMIT +BEGIN; +DROP TABLE transactional_drop_shards; +COMMIT; + +-- verify metadata is deleted +SELECT shardid FROM pg_dist_shard WHERE shardid IN (1410000, 1410001, 1410002, 1410003) ORDER BY shardid; +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (1410000, 1410001, 1410002, 1410003) +ORDER BY + shardid; + +-- verify table is dropped +\d transactional_drop_shards; + +-- verify shards are dropped +\c - - - :worker_1_port +\d transactional_drop_shards_*; +\c - - - :master_port + + +-- test master_delete_protocol in transaction, then ROLLBACK +CREATE TABLE transactional_drop_shards(column1 int); +SELECT create_distributed_table('transactional_drop_shards', 'column1', 'append'); +SELECT master_create_empty_shard('transactional_drop_shards'); + +BEGIN; +SELECT master_apply_delete_command('DELETE FROM transactional_drop_shards'); +ROLLBACK; + +-- verify metadata is not deleted +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid; +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid) +ORDER BY + shardid; + +-- verify shards are not dropped +\c - - - :worker_1_port +\d transactional_drop_shards_*; +\c - - - :master_port + + +-- test master_delete_protocol in transaction, then COMMIT +BEGIN; +SELECT master_apply_delete_command('DELETE FROM transactional_drop_shards'); +COMMIT; + +-- verify metadata is deleted +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid; +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid) +ORDER BY + shardid; + +-- verify shards are dropped +\c - - - :worker_1_port +\d transactional_drop_shards_*; +\c - - - :master_port + + +-- test DROP table in a transaction after insertion +SELECT master_create_empty_shard('transactional_drop_shards'); + +BEGIN; +INSERT INTO transactional_drop_shards VALUES (1); +DROP TABLE transactional_drop_shards; +ROLLBACK; + +-- verify metadata is not deleted +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid; +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid) +ORDER BY + shardid; + +-- verify table is not dropped +\d transactional_drop_shards; + +-- verify shards are not dropped +\c - - - :worker_1_port +\d transactional_drop_shards_*; +\c - - - :master_port + + +-- test master_apply_delete_command in a transaction after insertion +BEGIN; +INSERT INTO transactional_drop_shards VALUES (1); +SELECT master_apply_delete_command('DELETE FROM transactional_drop_shards'); +ROLLBACK; + +-- verify metadata is not deleted +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid; +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid) +ORDER BY + shardid; + +-- verify shards are not dropped +\c - - - :worker_1_port +\d transactional_drop_shards_*; + + +-- test DROP table with failing worker +CREATE FUNCTION fail_drop_table() RETURNS event_trigger AS $fdt$ + BEGIN + RAISE 'illegal value'; + END; +$fdt$ LANGUAGE plpgsql; + +CREATE EVENT TRIGGER fail_drop_table ON sql_drop EXECUTE PROCEDURE fail_drop_table(); + +\c - - - :master_port + +\set VERBOSITY terse +DROP TABLE transactional_drop_shards; +\set VERBOSITY default + +-- verify metadata is not deleted +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid; +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid) +ORDER BY + shardid; + +-- verify table is not dropped +\d transactional_drop_shards; + +-- verify shards are not dropped +\c - - - :worker_1_port +\d transactional_drop_shards_*; +\c - - - :master_port + + +-- test DROP reference table with failing worker +CREATE TABLE transactional_drop_reference(column1 int); +SELECT create_reference_table('transactional_drop_reference'); + +\set VERBOSITY terse +DROP TABLE transactional_drop_reference; +\set VERBOSITY default + +-- verify metadata is not deleted +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_reference'::regclass ORDER BY shardid; +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_reference'::regclass ORDER BY shardid) +ORDER BY + shardid, nodename, nodeport; + +-- verify table is not dropped +\d transactional_drop_reference; + +-- verify shards are not dropped +\c - - - :worker_1_port +\d transactional_drop_reference*; +\c - - - :master_port + + +-- test master_apply_delete_command table with failing worker +\set VERBOSITY terse +SELECT master_apply_delete_command('DELETE FROM transactional_drop_shards'); +\set VERBOSITY default + +-- verify metadata is not deleted +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid; +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid) +ORDER BY + shardid; + +-- verify shards are not dropped +\c - - - :worker_1_port +\d transactional_drop_shards_*; +DROP EVENT TRIGGER fail_drop_table; +\c - - - :master_port + + +-- test with SERIAL column + with more shards +SET citus.shard_count TO 8; +CREATE TABLE transactional_drop_serial(column1 int, column2 SERIAL); +SELECT create_distributed_table('transactional_drop_serial', 'column1'); + +-- test DROP TABLE(ergo master_drop_all_shards) in transaction, then ROLLBACK +BEGIN; +DROP TABLE transactional_drop_serial; +ROLLBACK; + +-- verify metadata is not deleted +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_serial'::regclass ORDER BY shardid; +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_serial'::regclass ORDER BY shardid) +ORDER BY + shardid, nodename, nodeport; + +-- verify table is not dropped +\d transactional_drop_serial; + +-- verify shards and sequence are not dropped +\c - - - :worker_1_port +\d transactional_drop_serial_1410006; +\ds transactional_drop_serial_column2_seq +\c - - - :master_port + + +-- test DROP TABLE(ergo master_drop_all_shards) in transaction, then COMMIT +BEGIN; +DROP TABLE transactional_drop_serial; +COMMIT; + +-- verify metadata is deleted +SELECT shardid FROM pg_dist_shard WHERE shardid IN (1410007, 1410008, 1410009, 1410010, 1410011, 1410012, 1410013, 1410014) ORDER BY shardid; +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (1410007, 1410008, 1410009, 1410010, 1410011, 1410012, 1410013, 1410014) +ORDER BY + shardid, nodename, nodeport; + +-- verify table is dropped +\d transactional_drop_serial; + +-- verify shards and sequence are dropped +\c - - - :worker_1_port +\d transactional_drop_serial_1410006; +\ds transactional_drop_serial_column2_seq +\c - - - :master_port + + +-- test with MX, DROP TABLE, then ROLLBACK +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 4; + +CREATE TABLE transactional_drop_mx(column1 int); +SELECT create_distributed_table('transactional_drop_mx', 'column1'); + +UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='transactional_drop_mx'::regclass; + +-- make worker 1 receive metadata changes +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + +-- see metadata is propogated to the worker +\c - - - :worker_1_port +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_mx'::regclass ORDER BY shardid; +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_mx'::regclass ORDER BY shardid) +ORDER BY + shardid, nodename, nodeport; + +\c - - - :master_port +BEGIN; +DROP TABLE transactional_drop_mx; +ROLLBACK; + +-- verify metadata is not deleted +\c - - - :worker_1_port +SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_mx'::regclass ORDER BY shardid; +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_mx'::regclass ORDER BY shardid) +ORDER BY + shardid, nodename, nodeport; + +-- test with MX, DROP TABLE, then COMMIT +\c - - - :master_port +BEGIN; +DROP TABLE transactional_drop_mx; +COMMIT; + +-- verify metadata is deleted +\c - - - :worker_1_port +SELECT shardid FROM pg_dist_shard WHERE shardid IN (1410015, 1410016, 1410017, 1410018) ORDER BY shardid; +SELECT + shardid, shardstate, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (1410015, 1410016, 1410017, 1410018) +ORDER BY + shardid, nodename, nodeport; + +\c - - - :master_port + +-- clean the workspace +DROP TABLE transactional_drop_shards, transactional_drop_reference; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); diff --git a/src/test/regress/sql/multi_truncate.sql b/src/test/regress/sql/multi_truncate.sql index b1a7399ff..c17dfb609 100644 --- a/src/test/regress/sql/multi_truncate.sql +++ b/src/test/regress/sql/multi_truncate.sql @@ -41,7 +41,7 @@ SELECT count(*) FROM test_truncate_append; -- verify no shard exists anymore SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass; --- command can not be run inside transaction +-- command can run inside transaction BEGIN; TRUNCATE TABLE test_truncate_append; COMMIT; DROP TABLE test_truncate_append;