Merge pull request #1136 from citusdata/convert_drop_shards_to_use_new_api

Convert drop shards to use new API
pull/1124/head
Burak Yücesoy 2017-01-23 21:16:16 +03:00 committed by GitHub
commit e796d5cb2b
17 changed files with 1320 additions and 92 deletions

View File

@ -837,7 +837,18 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry,
if (placementEntry->failed) if (placementEntry->failed)
{ {
UpdateShardPlacementState(placementEntry->key.placementId, FILE_INACTIVE); uint64 shardId = shardEntry->key.shardId;
uint64 placementId = placementEntry->key.placementId;
ShardPlacement *shardPlacement = LoadShardPlacement(shardId, placementId);
/*
* We only set shard state if its current state is FILE_FINALIZED, which
* prevents overwriting shard state if it is already set at somewhere else.
*/
if (shardPlacement->shardState == FILE_FINALIZED)
{
UpdateShardPlacementState(placementEntry->key.placementId, FILE_INACTIVE);
}
} }
} }

View File

@ -16,6 +16,7 @@
#include "postgres.h" #include "postgres.h"
#include "c.h" #include "c.h"
#include "fmgr.h" #include "fmgr.h"
#include "libpq-fe.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "port.h" #include "port.h"
@ -24,6 +25,7 @@
#include "access/xact.h" #include "access/xact.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "distributed/connection_management.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
@ -34,7 +36,9 @@
#include "distributed/multi_utility.h" #include "distributed/multi_utility.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/placement_connection.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/remote_commands.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
@ -106,10 +110,8 @@ master_apply_delete_command(PG_FUNCTION_ARGS)
bool dontWait = false; bool dontWait = false;
char partitionMethod = 0; char partitionMethod = 0;
bool failOK = false; bool failOK = false;
bool isTopLevel = true;
EnsureSchemaNode(); EnsureSchemaNode();
PreventTransactionChain(isTopLevel, "master_apply_delete_command");
queryTreeNode = ParseTreeNode(queryString); queryTreeNode = ParseTreeNode(queryString);
if (!IsA(queryTreeNode, DeleteStmt)) if (!IsA(queryTreeNode, DeleteStmt))
@ -200,7 +202,6 @@ master_drop_all_shards(PG_FUNCTION_ARGS)
text *schemaNameText = PG_GETARG_TEXT_P(1); text *schemaNameText = PG_GETARG_TEXT_P(1);
text *relationNameText = PG_GETARG_TEXT_P(2); text *relationNameText = PG_GETARG_TEXT_P(2);
bool isTopLevel = true;
List *shardIntervalList = NIL; List *shardIntervalList = NIL;
int droppedShardCount = 0; int droppedShardCount = 0;
@ -208,7 +209,6 @@ master_drop_all_shards(PG_FUNCTION_ARGS)
char *relationName = text_to_cstring(relationNameText); char *relationName = text_to_cstring(relationNameText);
EnsureSchemaNode(); EnsureSchemaNode();
PreventTransactionChain(isTopLevel, "DROP distributed table");
CheckTableSchemaNameForDrop(relationId, &schemaName, &relationName); CheckTableSchemaNameForDrop(relationId, &schemaName, &relationName);
@ -324,14 +324,26 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
ListCell *shardIntervalCell = NULL; ListCell *shardIntervalCell = NULL;
int droppedShardCount = 0; int droppedShardCount = 0;
if (XactModificationLevel != XACT_MODIFICATION_NONE)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("shard drop operations must not appear in "
"transaction blocks containing other distributed "
"modifications")));
}
BeginOrContinueCoordinatedTransaction();
/* At this point we intentionally decided to not use 2PC for reference tables */
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
{
CoordinatedTransactionUse2PC();
}
foreach(shardIntervalCell, deletableShardIntervalList) foreach(shardIntervalCell, deletableShardIntervalList)
{ {
List *shardPlacementList = NIL; List *shardPlacementList = NIL;
List *droppedPlacementList = NIL;
List *lingeringPlacementList = NIL;
ListCell *shardPlacementCell = NULL; ListCell *shardPlacementCell = NULL;
ListCell *droppedPlacementCell = NULL;
ListCell *lingeringPlacementCell = NULL;
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId; uint64 shardId = shardInterval->shardId;
char *quotedShardName = NULL; char *quotedShardName = NULL;
@ -350,8 +362,10 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
(ShardPlacement *) lfirst(shardPlacementCell); (ShardPlacement *) lfirst(shardPlacementCell);
char *workerName = shardPlacement->nodeName; char *workerName = shardPlacement->nodeName;
uint32 workerPort = shardPlacement->nodePort; uint32 workerPort = shardPlacement->nodePort;
bool dropSuccessful = false;
StringInfo workerDropQuery = makeStringInfo(); StringInfo workerDropQuery = makeStringInfo();
MultiConnection *connection = NULL;
uint32 connectionFlags = FOR_DDL;
char *extensionOwner = CitusExtensionOwnerName();
char storageType = shardInterval->storageType; char storageType = shardInterval->storageType;
if (storageType == SHARD_STORAGE_TABLE) if (storageType == SHARD_STORAGE_TABLE)
@ -366,58 +380,34 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
quotedShardName); quotedShardName);
} }
dropSuccessful = ExecuteRemoteCommand(workerName, workerPort, connection = GetNodeUserDatabaseConnection(connectionFlags, workerName,
workerDropQuery); workerPort, extensionOwner, NULL);
if (dropSuccessful)
{
droppedPlacementList = lappend(droppedPlacementList, shardPlacement);
}
else
{
lingeringPlacementList = lappend(lingeringPlacementList, shardPlacement);
}
}
/* make sure we don't process cancel signals */ RemoteTransactionBeginIfNecessary(connection);
HOLD_INTERRUPTS();
foreach(droppedPlacementCell, droppedPlacementList) if (PQstatus(connection->pgConn) != CONNECTION_OK)
{ {
ShardPlacement *placement = (ShardPlacement *) lfirst(droppedPlacementCell); uint64 placementId = shardPlacement->placementId;
char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort; ereport(WARNING, (errmsg("could not connect to shard \"%s\" on node "
"\"%s:%u\"", shardRelationName, workerName,
workerPort),
errdetail("Marking this shard placement for "
"deletion")));
UpdateShardPlacementState(placementId, FILE_TO_DELETE);
continue;
}
MarkRemoteTransactionCritical(connection);
ExecuteCriticalRemoteCommand(connection, workerDropQuery->data);
DeleteShardPlacementRow(shardId, workerName, workerPort); DeleteShardPlacementRow(shardId, workerName, workerPort);
} }
/* mark shard placements that we couldn't drop as to be deleted */
foreach(lingeringPlacementCell, lingeringPlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(lingeringPlacementCell);
uint64 placementId = placement->placementId;
char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort;
uint64 oldShardLength = placement->shardLength;
DeleteShardPlacementRow(shardId, workerName, workerPort);
InsertShardPlacementRow(shardId, placementId, FILE_TO_DELETE, oldShardLength,
workerName, workerPort);
ereport(WARNING, (errmsg("could not delete shard \"%s\" on node \"%s:%u\"",
shardRelationName, workerName, workerPort),
errdetail("Marking this shard placement for deletion")));
}
DeleteShardRow(shardId); DeleteShardRow(shardId);
if (QueryCancelPending)
{
ereport(WARNING, (errmsg("cancel requests are ignored during shard "
"deletion")));
QueryCancelPending = false;
}
RESUME_INTERRUPTS();
} }
droppedShardCount = list_length(deletableShardIntervalList); droppedShardCount = list_length(deletableShardIntervalList);

View File

@ -245,6 +245,48 @@ LoadShardInterval(uint64 shardId)
} }
/*
* LoadShardPlacement returns the, cached, metadata about a shard placement.
*
* The return value is a copy of the cached ShardPlacement struct and may
* therefore be modified and/or freed.
*/
ShardPlacement *
LoadShardPlacement(uint64 shardId, uint64 placementId)
{
ShardCacheEntry *shardEntry = NULL;
DistTableCacheEntry *tableEntry = NULL;
ShardPlacement *placementArray = NULL;
int numberOfPlacements = 0;
int i = 0;
shardEntry = LookupShardCacheEntry(shardId);
tableEntry = shardEntry->tableEntry;
/* the offset better be in a valid range */
Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength);
placementArray = tableEntry->arrayOfPlacementArrays[shardEntry->shardIndex];
numberOfPlacements = tableEntry->arrayOfPlacementArrayLengths[shardEntry->shardIndex];
for (i = 0; i < numberOfPlacements; i++)
{
if (placementArray[i].placementId == placementId)
{
ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement);
CopyShardPlacement(&placementArray[i], shardPlacement);
return shardPlacement;
}
}
ereport(ERROR, (errmsg("could not find valid entry for shard placement "
UINT64_FORMAT, placementId)));
}
/* /*
* ShardPlacementList returns the list of placements for the given shard from * ShardPlacementList returns the list of placements for the given shard from
* the cache. * the cache.

View File

@ -59,6 +59,7 @@ typedef struct
extern bool IsDistributedTable(Oid relationId); extern bool IsDistributedTable(Oid relationId);
extern List * DistributedTableList(void); extern List * DistributedTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId); extern ShardInterval * LoadShardInterval(uint64 shardId);
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
extern int GetLocalGroupId(void); extern int GetLocalGroupId(void);
extern List * DistTableOidList(void); extern List * DistTableOidList(void);

View File

@ -648,6 +648,35 @@ NOTICE: Replicating reference table "remove_node_reference_table" to all worker
(1 row) (1 row)
-- test DROP table after removing a node in a transaction -- test DROP table after removing a node in a transaction
-- status before master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1380000 | 1 | 2 | 0
(1 row)
BEGIN; BEGIN;
SELECT master_remove_node('localhost', :worker_2_port); SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node master_remove_node
@ -656,10 +685,44 @@ SELECT master_remove_node('localhost', :worker_2_port);
(1 row) (1 row)
DROP TABLE remove_node_reference_table; DROP TABLE remove_node_reference_table;
ERROR: DROP distributed table cannot run inside a transaction block COMMIT;
CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" -- status after master_remove_node
PL/pgSQL function citus_drop_trigger() line 21 at PERFORM SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
ROLLBACK; count
-------
0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT * FROM pg_dist_colocation WHERE colocationid = 1380000;
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
(0 rows)
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
---------------------------------------------
(1380005,1380005,localhost,57638,default,f)
(1 row)
-- re-create remove_node_reference_table
CREATE TABLE remove_node_reference_table(column1 int);
SELECT create_reference_table('remove_node_reference_table');
create_reference_table
------------------------
(1 row)
-- test removing a node while there is a reference table at another schema -- test removing a node while there is a reference table at another schema
CREATE SCHEMA remove_node_reference_table_schema; CREATE SCHEMA remove_node_reference_table_schema;
CREATE TABLE remove_node_reference_table_schema.table1(column1 int); CREATE TABLE remove_node_reference_table_schema.table1(column1 int);
@ -686,8 +749,8 @@ ORDER BY
shardid; shardid;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
1380001 | 1 | 0 | localhost | 57638 1380001 | 1 | 0 | localhost | 57638
1380002 | 1 | 0 | localhost | 57638
(2 rows) (2 rows)
SELECT * SELECT *
@ -698,7 +761,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass); WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------
1380000 | 1 | 2 | 0 1380001 | 1 | 2 | 0
(1 row) (1 row)
@ -717,8 +780,8 @@ WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
1380001 | 1 | 0 | localhost | 57638 1380001 | 1 | 0 | localhost | 57638
1380002 | 1 | 0 | localhost | 57638
(2 rows) (2 rows)
@ -754,7 +817,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass); WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------
1380000 | 1 | 1 | 0 1380001 | 1 | 1 | 0
(1 row) (1 row)
\c - - - :worker_1_port \c - - - :worker_1_port
@ -783,7 +846,7 @@ NOTICE: Replicating reference table "remove_node_reference_table" to all worker
NOTICE: Replicating reference table "table1" to all workers NOTICE: Replicating reference table "table1" to all workers
master_add_node master_add_node
--------------------------------------------- ---------------------------------------------
(1380005,1380005,localhost,57638,default,f) (1380006,1380006,localhost,57638,default,f)
(1 row) (1 row)
-- test with master_disable_node -- test with master_disable_node
@ -804,8 +867,8 @@ ORDER BY
shardid; shardid;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
1380001 | 1 | 0 | localhost | 57638 1380001 | 1 | 0 | localhost | 57638
1380002 | 1 | 0 | localhost | 57638
(2 rows) (2 rows)
SELECT * SELECT *
@ -816,7 +879,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'remove_node_reference_table'::regclass); WHERE logicalrelid = 'remove_node_reference_table'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------
1380000 | 1 | 2 | 0 1380001 | 1 | 2 | 0
(1 row) (1 row)
\c - - - :worker_1_port \c - - - :worker_1_port
@ -834,8 +897,8 @@ WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
1380001 | 1 | 0 | localhost | 57638 1380001 | 1 | 0 | localhost | 57638
1380002 | 1 | 0 | localhost | 57638
(2 rows) (2 rows)
@ -872,7 +935,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'remove_node_reference_table'::regclass); WHERE logicalrelid = 'remove_node_reference_table'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------
1380000 | 1 | 1 | 0 1380001 | 1 | 1 | 0
(1 row) (1 row)
\c - - - :worker_1_port \c - - - :worker_1_port
@ -900,7 +963,7 @@ NOTICE: Replicating reference table "remove_node_reference_table" to all worker
NOTICE: Replicating reference table "table1" to all workers NOTICE: Replicating reference table "table1" to all workers
master_add_node master_add_node
--------------------------------------------- ---------------------------------------------
(1380006,1380006,localhost,57638,default,f) (1380007,1380007,localhost,57638,default,f)
(1 row) (1 row)
-- DROP tables to clean workspace -- DROP tables to clean workspace

View File

@ -526,6 +526,28 @@ SELECT create_reference_table('replicate_reference_table_drop');
(1 row) (1 row)
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_drop'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1370009 | 1 | 1 | 0
(1 row)
BEGIN; BEGIN;
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_drop" to all workers NOTICE: Replicating reference table "replicate_reference_table_drop" to all workers
@ -535,12 +557,30 @@ NOTICE: Replicating reference table "replicate_reference_table_drop" to all wor
(1 row) (1 row)
DROP TABLE replicate_reference_table_drop; DROP TABLE replicate_reference_table_drop;
ERROR: DROP distributed table cannot run inside a transaction block COMMIT;
CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" -- status after master_add_node
PL/pgSQL function citus_drop_trigger() line 21 at PERFORM SELECT
ROLLBACK; shardid, shardstate, shardlength, nodename, nodeport
DROP TABLE replicate_reference_table_drop; FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT * FROM pg_dist_colocation WHERE colocationid = 1370009;
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
(0 rows)
-- test adding a node while there is a reference table at another schema -- test adding a node while there is a reference table at another schema
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
CREATE SCHEMA replicate_reference_table_schema; CREATE SCHEMA replicate_reference_table_schema;
CREATE TABLE replicate_reference_table_schema.table1(column1 int); CREATE TABLE replicate_reference_table_schema.table1(column1 int);
SELECT create_reference_table('replicate_reference_table_schema.table1'); SELECT create_reference_table('replicate_reference_table_schema.table1');

View File

@ -22,13 +22,20 @@ ERROR: cannot execute ALTER TABLE command involving partition column
-- verify that the distribution column can't be dropped -- verify that the distribution column can't be dropped
ALTER TABLE testtableddl DROP COLUMN distributecol; ALTER TABLE testtableddl DROP COLUMN distributecol;
ERROR: cannot execute ALTER TABLE command involving partition column ERROR: cannot execute ALTER TABLE command involving partition column
-- verify that the table cannot be dropped in a transaction block -- verify that the table can be dropped in a transaction block
\set VERBOSITY terse \set VERBOSITY terse
BEGIN; BEGIN;
DROP TABLE testtableddl; DROP TABLE testtableddl;
ERROR: DROP distributed table cannot run inside a transaction block COMMIT;
ROLLBACK;
\set VERBOSITY default \set VERBOSITY default
-- recreate testtableddl
CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL);
SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append');
master_create_distributed_table
---------------------------------
(1 row)
-- verify that the table can be dropped -- verify that the table can be dropped
DROP TABLE testtableddl; DROP TABLE testtableddl;
-- verify that the table can dropped even if shards exist -- verify that the table can dropped even if shards exist

View File

@ -0,0 +1,627 @@
--
-- MULTI_TRANSACTIONAL_DROP_SHARDS
--
-- Tests that check the metadata returned by the master node.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1410000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1410000;
SET citus.shard_count TO 4;
-- test DROP TABLE(ergo master_drop_all_shards) in transaction, then ROLLBACK
CREATE TABLE transactional_drop_shards(column1 int);
SELECT create_distributed_table('transactional_drop_shards', 'column1');
create_distributed_table
--------------------------
(1 row)
BEGIN;
DROP TABLE transactional_drop_shards;
ROLLBACK;
-- verify metadata is not deleted
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid;
shardid
---------
1410000
1410001
1410002
1410003
(4 rows)
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid)
ORDER BY
shardid;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
1410000 | 1 | localhost | 57637
1410000 | 1 | localhost | 57638
1410001 | 1 | localhost | 57638
1410001 | 1 | localhost | 57637
1410002 | 1 | localhost | 57637
1410002 | 1 | localhost | 57638
1410003 | 1 | localhost | 57638
1410003 | 1 | localhost | 57637
(8 rows)
-- verify table is not dropped
\d transactional_drop_shards;
Table "public.transactional_drop_shards"
Column | Type | Modifiers
---------+---------+-----------
column1 | integer |
-- verify shards are not dropped
\c - - - :worker_1_port
\d transactional_drop_shards_*;
Table "public.transactional_drop_shards_1410000"
Column | Type | Modifiers
---------+---------+-----------
column1 | integer |
Table "public.transactional_drop_shards_1410001"
Column | Type | Modifiers
---------+---------+-----------
column1 | integer |
Table "public.transactional_drop_shards_1410002"
Column | Type | Modifiers
---------+---------+-----------
column1 | integer |
Table "public.transactional_drop_shards_1410003"
Column | Type | Modifiers
---------+---------+-----------
column1 | integer |
\c - - - :master_port
-- test DROP TABLE(ergo master_drop_all_shards) in transaction, then COMMIT
BEGIN;
DROP TABLE transactional_drop_shards;
COMMIT;
-- verify metadata is deleted
SELECT shardid FROM pg_dist_shard WHERE shardid IN (1410000, 1410001, 1410002, 1410003) ORDER BY shardid;
shardid
---------
(0 rows)
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (1410000, 1410001, 1410002, 1410003)
ORDER BY
shardid;
shardid | shardstate | nodename | nodeport
---------+------------+----------+----------
(0 rows)
-- verify table is dropped
\d transactional_drop_shards;
-- verify shards are dropped
\c - - - :worker_1_port
\d transactional_drop_shards_*;
\c - - - :master_port
-- test master_delete_protocol in transaction, then ROLLBACK
CREATE TABLE transactional_drop_shards(column1 int);
SELECT create_distributed_table('transactional_drop_shards', 'column1', 'append');
create_distributed_table
--------------------------
(1 row)
SELECT master_create_empty_shard('transactional_drop_shards');
master_create_empty_shard
---------------------------
1410004
(1 row)
BEGIN;
SELECT master_apply_delete_command('DELETE FROM transactional_drop_shards');
master_apply_delete_command
-----------------------------
1
(1 row)
ROLLBACK;
-- verify metadata is not deleted
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid;
shardid
---------
1410004
(1 row)
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid)
ORDER BY
shardid;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
1410004 | 1 | localhost | 57638
1410004 | 1 | localhost | 57637
(2 rows)
-- verify shards are not dropped
\c - - - :worker_1_port
\d transactional_drop_shards_*;
Table "public.transactional_drop_shards_1410004"
Column | Type | Modifiers
---------+---------+-----------
column1 | integer |
\c - - - :master_port
-- test master_delete_protocol in transaction, then COMMIT
BEGIN;
SELECT master_apply_delete_command('DELETE FROM transactional_drop_shards');
master_apply_delete_command
-----------------------------
1
(1 row)
COMMIT;
-- verify metadata is deleted
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid;
shardid
---------
(0 rows)
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid)
ORDER BY
shardid;
shardid | shardstate | nodename | nodeport
---------+------------+----------+----------
(0 rows)
-- verify shards are dropped
\c - - - :worker_1_port
\d transactional_drop_shards_*;
\c - - - :master_port
-- test DROP table in a transaction after insertion
SELECT master_create_empty_shard('transactional_drop_shards');
master_create_empty_shard
---------------------------
1410005
(1 row)
BEGIN;
INSERT INTO transactional_drop_shards VALUES (1);
DROP TABLE transactional_drop_shards;
ERROR: shard drop operations must not appear in transaction blocks containing other distributed modifications
CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)"
PL/pgSQL function citus_drop_trigger() line 21 at PERFORM
ROLLBACK;
-- verify metadata is not deleted
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid;
shardid
---------
1410005
(1 row)
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid)
ORDER BY
shardid;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
1410005 | 1 | localhost | 57637
1410005 | 1 | localhost | 57638
(2 rows)
-- verify table is not dropped
\d transactional_drop_shards;
Table "public.transactional_drop_shards"
Column | Type | Modifiers
---------+---------+-----------
column1 | integer |
-- verify shards are not dropped
\c - - - :worker_1_port
\d transactional_drop_shards_*;
Table "public.transactional_drop_shards_1410005"
Column | Type | Modifiers
---------+---------+-----------
column1 | integer |
\c - - - :master_port
-- test master_apply_delete_command in a transaction after insertion
BEGIN;
INSERT INTO transactional_drop_shards VALUES (1);
SELECT master_apply_delete_command('DELETE FROM transactional_drop_shards');
ERROR: shard drop operations must not appear in transaction blocks containing other distributed modifications
ROLLBACK;
-- verify metadata is not deleted
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid;
shardid
---------
1410005
(1 row)
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid)
ORDER BY
shardid;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
1410005 | 1 | localhost | 57637
1410005 | 1 | localhost | 57638
(2 rows)
-- verify shards are not dropped
\c - - - :worker_1_port
\d transactional_drop_shards_*;
Table "public.transactional_drop_shards_1410005"
Column | Type | Modifiers
---------+---------+-----------
column1 | integer |
-- test DROP table with failing worker
CREATE FUNCTION fail_drop_table() RETURNS event_trigger AS $fdt$
BEGIN
RAISE 'illegal value';
END;
$fdt$ LANGUAGE plpgsql;
CREATE EVENT TRIGGER fail_drop_table ON sql_drop EXECUTE PROCEDURE fail_drop_table();
\c - - - :master_port
\set VERBOSITY terse
DROP TABLE transactional_drop_shards;
ERROR: illegal value
\set VERBOSITY default
-- verify metadata is not deleted
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid;
shardid
---------
1410005
(1 row)
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid)
ORDER BY
shardid;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
1410005 | 1 | localhost | 57637
1410005 | 1 | localhost | 57638
(2 rows)
-- verify table is not dropped
\d transactional_drop_shards;
Table "public.transactional_drop_shards"
Column | Type | Modifiers
---------+---------+-----------
column1 | integer |
-- verify shards are not dropped
\c - - - :worker_1_port
\d transactional_drop_shards_*;
Table "public.transactional_drop_shards_1410005"
Column | Type | Modifiers
---------+---------+-----------
column1 | integer |
\c - - - :master_port
-- test DROP reference table with failing worker
CREATE TABLE transactional_drop_reference(column1 int);
SELECT create_reference_table('transactional_drop_reference');
create_reference_table
------------------------
(1 row)
\set VERBOSITY terse
DROP TABLE transactional_drop_reference;
ERROR: illegal value
\set VERBOSITY default
-- verify metadata is not deleted
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_reference'::regclass ORDER BY shardid;
shardid
---------
1410006
(1 row)
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_reference'::regclass ORDER BY shardid)
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
1410006 | 1 | localhost | 57637
1410006 | 1 | localhost | 57638
(2 rows)
-- verify table is not dropped
\d transactional_drop_reference;
Table "public.transactional_drop_reference"
Column | Type | Modifiers
---------+---------+-----------
column1 | integer |
-- verify shards are not dropped
\c - - - :worker_1_port
\d transactional_drop_reference*;
Table "public.transactional_drop_reference_1410006"
Column | Type | Modifiers
---------+---------+-----------
column1 | integer |
\c - - - :master_port
-- test master_apply_delete_command table with failing worker
\set VERBOSITY terse
SELECT master_apply_delete_command('DELETE FROM transactional_drop_shards');
ERROR: illegal value
\set VERBOSITY default
-- verify metadata is not deleted
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid;
shardid
---------
1410005
(1 row)
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid)
ORDER BY
shardid;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
1410005 | 1 | localhost | 57637
1410005 | 1 | localhost | 57638
(2 rows)
-- verify shards are not dropped
\c - - - :worker_1_port
\d transactional_drop_shards_*;
Table "public.transactional_drop_shards_1410005"
Column | Type | Modifiers
---------+---------+-----------
column1 | integer |
DROP EVENT TRIGGER fail_drop_table;
\c - - - :master_port
-- test with SERIAL column + with more shards
SET citus.shard_count TO 8;
CREATE TABLE transactional_drop_serial(column1 int, column2 SERIAL);
SELECT create_distributed_table('transactional_drop_serial', 'column1');
create_distributed_table
--------------------------
(1 row)
-- test DROP TABLE(ergo master_drop_all_shards) in transaction, then ROLLBACK
BEGIN;
DROP TABLE transactional_drop_serial;
ROLLBACK;
-- verify metadata is not deleted
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_serial'::regclass ORDER BY shardid;
shardid
---------
1410007
1410008
1410009
1410010
1410011
1410012
1410013
1410014
(8 rows)
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_serial'::regclass ORDER BY shardid)
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
1410007 | 1 | localhost | 57637
1410007 | 1 | localhost | 57638
1410008 | 1 | localhost | 57637
1410008 | 1 | localhost | 57638
1410009 | 1 | localhost | 57637
1410009 | 1 | localhost | 57638
1410010 | 1 | localhost | 57637
1410010 | 1 | localhost | 57638
1410011 | 1 | localhost | 57637
1410011 | 1 | localhost | 57638
1410012 | 1 | localhost | 57637
1410012 | 1 | localhost | 57638
1410013 | 1 | localhost | 57637
1410013 | 1 | localhost | 57638
1410014 | 1 | localhost | 57637
1410014 | 1 | localhost | 57638
(16 rows)
-- verify table is not dropped
\d transactional_drop_serial;
Table "public.transactional_drop_serial"
Column | Type | Modifiers
---------+---------+-----------------------------------------------------------------------------
column1 | integer |
column2 | integer | not null default nextval('transactional_drop_serial_column2_seq'::regclass)
-- verify shards and sequence are not dropped
\c - - - :worker_1_port
\d transactional_drop_serial_1410006;
\ds transactional_drop_serial_column2_seq
List of relations
Schema | Name | Type | Owner
--------+---------------------------------------+----------+----------
public | transactional_drop_serial_column2_seq | sequence | postgres
(1 row)
\c - - - :master_port
-- test DROP TABLE(ergo master_drop_all_shards) in transaction, then COMMIT
BEGIN;
DROP TABLE transactional_drop_serial;
COMMIT;
-- verify metadata is deleted
SELECT shardid FROM pg_dist_shard WHERE shardid IN (1410007, 1410008, 1410009, 1410010, 1410011, 1410012, 1410013, 1410014) ORDER BY shardid;
shardid
---------
(0 rows)
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (1410007, 1410008, 1410009, 1410010, 1410011, 1410012, 1410013, 1410014)
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | nodename | nodeport
---------+------------+----------+----------
(0 rows)
-- verify table is dropped
\d transactional_drop_serial;
-- verify shards and sequence are dropped
\c - - - :worker_1_port
\d transactional_drop_serial_1410006;
\ds transactional_drop_serial_column2_seq
List of relations
Schema | Name | Type | Owner
--------+------+------+-------
(0 rows)
\c - - - :master_port
-- test with MX, DROP TABLE, then ROLLBACK
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 4;
CREATE TABLE transactional_drop_mx(column1 int);
SELECT create_distributed_table('transactional_drop_mx', 'column1');
create_distributed_table
--------------------------
(1 row)
UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='transactional_drop_mx'::regclass;
-- make worker 1 receive metadata changes
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
-- see metadata is propogated to the worker
\c - - - :worker_1_port
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_mx'::regclass ORDER BY shardid;
shardid
---------
1410015
1410016
1410017
1410018
(4 rows)
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_mx'::regclass ORDER BY shardid)
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
1410015 | 1 | localhost | 57637
1410016 | 1 | localhost | 57638
1410017 | 1 | localhost | 57637
1410018 | 1 | localhost | 57638
(4 rows)
\c - - - :master_port
BEGIN;
DROP TABLE transactional_drop_mx;
ROLLBACK;
-- verify metadata is not deleted
\c - - - :worker_1_port
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_mx'::regclass ORDER BY shardid;
shardid
---------
1410015
1410016
1410017
1410018
(4 rows)
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_mx'::regclass ORDER BY shardid)
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
1410015 | 1 | localhost | 57637
1410016 | 1 | localhost | 57638
1410017 | 1 | localhost | 57637
1410018 | 1 | localhost | 57638
(4 rows)
-- test with MX, DROP TABLE, then COMMIT
\c - - - :master_port
BEGIN;
DROP TABLE transactional_drop_mx;
COMMIT;
-- verify metadata is deleted
\c - - - :worker_1_port
SELECT shardid FROM pg_dist_shard WHERE shardid IN (1410015, 1410016, 1410017, 1410018) ORDER BY shardid;
shardid
---------
(0 rows)
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (1410015, 1410016, 1410017, 1410018)
ORDER BY
shardid, nodename, nodeport;
shardid | shardstate | nodename | nodeport
---------+------------+----------+----------
(0 rows)
\c - - - :master_port
-- clean the workspace
DROP TABLE transactional_drop_shards, transactional_drop_reference;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
----------------------------
(1 row)

View File

@ -68,11 +68,8 @@ SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::r
--------- ---------
(0 rows) (0 rows)
-- command can not be run inside transaction -- command can run inside transaction
BEGIN; TRUNCATE TABLE test_truncate_append; COMMIT; BEGIN; TRUNCATE TABLE test_truncate_append; COMMIT;
ERROR: DROP distributed table cannot run inside a transaction block
CONTEXT: SQL statement "SELECT master_drop_all_shards(TG_RELID, TG_TABLE_SCHEMA, TG_TABLE_NAME)"
PL/pgSQL function citus_truncate_trigger() line 13 at PERFORM
DROP TABLE test_truncate_append; DROP TABLE test_truncate_append;
-- --
-- truncate for range distribution -- truncate for range distribution

View File

@ -52,7 +52,8 @@ SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol
WHERE c_custkey > 1000'); WHERE c_custkey > 1000');
SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol'); SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol');
-- Verify that master_apply_delete_command cannot be called in a transaction block -- Verify that master_apply_delete_command can be called in a transaction block
SELECT 1 AS one FROM master_create_empty_shard('customer_delete_protocol');
BEGIN; BEGIN;
SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol'); SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol');
ROLLBACK; COMMIT;

View File

@ -215,3 +215,8 @@ test: multi_foreign_key
test: multi_upgrade_reference_table test: multi_upgrade_reference_table
test: multi_replicate_reference_table test: multi_replicate_reference_table
test: multi_remove_node_reference_table test: multi_remove_node_reference_table
# ----------
# multi_transactional_drop_shards tests for dropping shards using connection API
# ----------
test: multi_transactional_drop_shards

View File

@ -94,8 +94,18 @@ SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol');
1 1
(1 row) (1 row)
-- Verify that master_apply_delete_command cannot be called in a transaction block -- Verify that master_apply_delete_command can be called in a transaction block
SELECT 1 AS one FROM master_create_empty_shard('customer_delete_protocol');
one
-----
1
(1 row)
BEGIN; BEGIN;
SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol'); SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol');
ERROR: master_apply_delete_command cannot run inside a transaction block master_apply_delete_command
ROLLBACK; -----------------------------
1
(1 row)
COMMIT;

View File

@ -392,11 +392,47 @@ SELECT master_add_node('localhost', :worker_2_port);
-- test DROP table after removing a node in a transaction -- test DROP table after removing a node in a transaction
-- status before master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
BEGIN; BEGIN;
SELECT master_remove_node('localhost', :worker_2_port); SELECT master_remove_node('localhost', :worker_2_port);
DROP TABLE remove_node_reference_table; DROP TABLE remove_node_reference_table;
ROLLBACK; COMMIT;
-- status after master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT * FROM pg_dist_colocation WHERE colocationid = 1380000;
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
-- re-create remove_node_reference_table
CREATE TABLE remove_node_reference_table(column1 int);
SELECT create_reference_table('remove_node_reference_table');
-- test removing a node while there is a reference table at another schema -- test removing a node while there is a reference table at another schema
CREATE SCHEMA remove_node_reference_table_schema; CREATE SCHEMA remove_node_reference_table_schema;

View File

@ -345,14 +345,39 @@ DROP TABLE replicate_reference_table_ddl;
CREATE TABLE replicate_reference_table_drop(column1 int); CREATE TABLE replicate_reference_table_drop(column1 int);
SELECT create_reference_table('replicate_reference_table_drop'); SELECT create_reference_table('replicate_reference_table_drop');
-- status before master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_drop'::regclass);
BEGIN; BEGIN;
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
DROP TABLE replicate_reference_table_drop; DROP TABLE replicate_reference_table_drop;
ROLLBACK; COMMIT;
DROP TABLE replicate_reference_table_drop; -- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT * FROM pg_dist_colocation WHERE colocationid = 1370009;
-- test adding a node while there is a reference table at another schema -- test adding a node while there is a reference table at another schema
SELECT master_remove_node('localhost', :worker_2_port);
CREATE SCHEMA replicate_reference_table_schema; CREATE SCHEMA replicate_reference_table_schema;
CREATE TABLE replicate_reference_table_schema.table1(column1 int); CREATE TABLE replicate_reference_table_schema.table1(column1 int);
SELECT create_reference_table('replicate_reference_table_schema.table1'); SELECT create_reference_table('replicate_reference_table_schema.table1');

View File

@ -20,13 +20,17 @@ ALTER TABLE testtableddl ALTER COLUMN distributecol TYPE text;
-- verify that the distribution column can't be dropped -- verify that the distribution column can't be dropped
ALTER TABLE testtableddl DROP COLUMN distributecol; ALTER TABLE testtableddl DROP COLUMN distributecol;
-- verify that the table cannot be dropped in a transaction block -- verify that the table can be dropped in a transaction block
\set VERBOSITY terse \set VERBOSITY terse
BEGIN; BEGIN;
DROP TABLE testtableddl; DROP TABLE testtableddl;
ROLLBACK; COMMIT;
\set VERBOSITY default \set VERBOSITY default
-- recreate testtableddl
CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL);
SELECT master_create_distributed_table('testtableddl', 'distributecol', 'append');
-- verify that the table can be dropped -- verify that the table can be dropped
DROP TABLE testtableddl; DROP TABLE testtableddl;

View File

@ -0,0 +1,369 @@
--
-- MULTI_TRANSACTIONAL_DROP_SHARDS
--
-- Tests that check the metadata returned by the master node.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1410000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1410000;
SET citus.shard_count TO 4;
-- test DROP TABLE(ergo master_drop_all_shards) in transaction, then ROLLBACK
CREATE TABLE transactional_drop_shards(column1 int);
SELECT create_distributed_table('transactional_drop_shards', 'column1');
BEGIN;
DROP TABLE transactional_drop_shards;
ROLLBACK;
-- verify metadata is not deleted
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid;
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid)
ORDER BY
shardid;
-- verify table is not dropped
\d transactional_drop_shards;
-- verify shards are not dropped
\c - - - :worker_1_port
\d transactional_drop_shards_*;
\c - - - :master_port
-- test DROP TABLE(ergo master_drop_all_shards) in transaction, then COMMIT
BEGIN;
DROP TABLE transactional_drop_shards;
COMMIT;
-- verify metadata is deleted
SELECT shardid FROM pg_dist_shard WHERE shardid IN (1410000, 1410001, 1410002, 1410003) ORDER BY shardid;
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (1410000, 1410001, 1410002, 1410003)
ORDER BY
shardid;
-- verify table is dropped
\d transactional_drop_shards;
-- verify shards are dropped
\c - - - :worker_1_port
\d transactional_drop_shards_*;
\c - - - :master_port
-- test master_delete_protocol in transaction, then ROLLBACK
CREATE TABLE transactional_drop_shards(column1 int);
SELECT create_distributed_table('transactional_drop_shards', 'column1', 'append');
SELECT master_create_empty_shard('transactional_drop_shards');
BEGIN;
SELECT master_apply_delete_command('DELETE FROM transactional_drop_shards');
ROLLBACK;
-- verify metadata is not deleted
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid;
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid)
ORDER BY
shardid;
-- verify shards are not dropped
\c - - - :worker_1_port
\d transactional_drop_shards_*;
\c - - - :master_port
-- test master_delete_protocol in transaction, then COMMIT
BEGIN;
SELECT master_apply_delete_command('DELETE FROM transactional_drop_shards');
COMMIT;
-- verify metadata is deleted
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid;
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid)
ORDER BY
shardid;
-- verify shards are dropped
\c - - - :worker_1_port
\d transactional_drop_shards_*;
\c - - - :master_port
-- test DROP table in a transaction after insertion
SELECT master_create_empty_shard('transactional_drop_shards');
BEGIN;
INSERT INTO transactional_drop_shards VALUES (1);
DROP TABLE transactional_drop_shards;
ROLLBACK;
-- verify metadata is not deleted
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid;
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid)
ORDER BY
shardid;
-- verify table is not dropped
\d transactional_drop_shards;
-- verify shards are not dropped
\c - - - :worker_1_port
\d transactional_drop_shards_*;
\c - - - :master_port
-- test master_apply_delete_command in a transaction after insertion
BEGIN;
INSERT INTO transactional_drop_shards VALUES (1);
SELECT master_apply_delete_command('DELETE FROM transactional_drop_shards');
ROLLBACK;
-- verify metadata is not deleted
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid;
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid)
ORDER BY
shardid;
-- verify shards are not dropped
\c - - - :worker_1_port
\d transactional_drop_shards_*;
-- test DROP table with failing worker
CREATE FUNCTION fail_drop_table() RETURNS event_trigger AS $fdt$
BEGIN
RAISE 'illegal value';
END;
$fdt$ LANGUAGE plpgsql;
CREATE EVENT TRIGGER fail_drop_table ON sql_drop EXECUTE PROCEDURE fail_drop_table();
\c - - - :master_port
\set VERBOSITY terse
DROP TABLE transactional_drop_shards;
\set VERBOSITY default
-- verify metadata is not deleted
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid;
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid)
ORDER BY
shardid;
-- verify table is not dropped
\d transactional_drop_shards;
-- verify shards are not dropped
\c - - - :worker_1_port
\d transactional_drop_shards_*;
\c - - - :master_port
-- test DROP reference table with failing worker
CREATE TABLE transactional_drop_reference(column1 int);
SELECT create_reference_table('transactional_drop_reference');
\set VERBOSITY terse
DROP TABLE transactional_drop_reference;
\set VERBOSITY default
-- verify metadata is not deleted
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_reference'::regclass ORDER BY shardid;
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_reference'::regclass ORDER BY shardid)
ORDER BY
shardid, nodename, nodeport;
-- verify table is not dropped
\d transactional_drop_reference;
-- verify shards are not dropped
\c - - - :worker_1_port
\d transactional_drop_reference*;
\c - - - :master_port
-- test master_apply_delete_command table with failing worker
\set VERBOSITY terse
SELECT master_apply_delete_command('DELETE FROM transactional_drop_shards');
\set VERBOSITY default
-- verify metadata is not deleted
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid;
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_shards'::regclass ORDER BY shardid)
ORDER BY
shardid;
-- verify shards are not dropped
\c - - - :worker_1_port
\d transactional_drop_shards_*;
DROP EVENT TRIGGER fail_drop_table;
\c - - - :master_port
-- test with SERIAL column + with more shards
SET citus.shard_count TO 8;
CREATE TABLE transactional_drop_serial(column1 int, column2 SERIAL);
SELECT create_distributed_table('transactional_drop_serial', 'column1');
-- test DROP TABLE(ergo master_drop_all_shards) in transaction, then ROLLBACK
BEGIN;
DROP TABLE transactional_drop_serial;
ROLLBACK;
-- verify metadata is not deleted
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_serial'::regclass ORDER BY shardid;
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_serial'::regclass ORDER BY shardid)
ORDER BY
shardid, nodename, nodeport;
-- verify table is not dropped
\d transactional_drop_serial;
-- verify shards and sequence are not dropped
\c - - - :worker_1_port
\d transactional_drop_serial_1410006;
\ds transactional_drop_serial_column2_seq
\c - - - :master_port
-- test DROP TABLE(ergo master_drop_all_shards) in transaction, then COMMIT
BEGIN;
DROP TABLE transactional_drop_serial;
COMMIT;
-- verify metadata is deleted
SELECT shardid FROM pg_dist_shard WHERE shardid IN (1410007, 1410008, 1410009, 1410010, 1410011, 1410012, 1410013, 1410014) ORDER BY shardid;
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (1410007, 1410008, 1410009, 1410010, 1410011, 1410012, 1410013, 1410014)
ORDER BY
shardid, nodename, nodeport;
-- verify table is dropped
\d transactional_drop_serial;
-- verify shards and sequence are dropped
\c - - - :worker_1_port
\d transactional_drop_serial_1410006;
\ds transactional_drop_serial_column2_seq
\c - - - :master_port
-- test with MX, DROP TABLE, then ROLLBACK
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 4;
CREATE TABLE transactional_drop_mx(column1 int);
SELECT create_distributed_table('transactional_drop_mx', 'column1');
UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='transactional_drop_mx'::regclass;
-- make worker 1 receive metadata changes
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- see metadata is propogated to the worker
\c - - - :worker_1_port
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_mx'::regclass ORDER BY shardid;
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_mx'::regclass ORDER BY shardid)
ORDER BY
shardid, nodename, nodeport;
\c - - - :master_port
BEGIN;
DROP TABLE transactional_drop_mx;
ROLLBACK;
-- verify metadata is not deleted
\c - - - :worker_1_port
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_mx'::regclass ORDER BY shardid;
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'transactional_drop_mx'::regclass ORDER BY shardid)
ORDER BY
shardid, nodename, nodeport;
-- test with MX, DROP TABLE, then COMMIT
\c - - - :master_port
BEGIN;
DROP TABLE transactional_drop_mx;
COMMIT;
-- verify metadata is deleted
\c - - - :worker_1_port
SELECT shardid FROM pg_dist_shard WHERE shardid IN (1410015, 1410016, 1410017, 1410018) ORDER BY shardid;
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (1410015, 1410016, 1410017, 1410018)
ORDER BY
shardid, nodename, nodeport;
\c - - - :master_port
-- clean the workspace
DROP TABLE transactional_drop_shards, transactional_drop_reference;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);

View File

@ -41,7 +41,7 @@ SELECT count(*) FROM test_truncate_append;
-- verify no shard exists anymore -- verify no shard exists anymore
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass; SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass;
-- command can not be run inside transaction -- command can run inside transaction
BEGIN; TRUNCATE TABLE test_truncate_append; COMMIT; BEGIN; TRUNCATE TABLE test_truncate_append; COMMIT;
DROP TABLE test_truncate_append; DROP TABLE test_truncate_append;