Merge pull request #1168 from citusdata/copy_inactive

Set placement to inactive on connection failure in COPY
pull/1169/head
Marco Slot 2017-01-26 13:23:13 +04:00 committed by GitHub
commit c44ae463ae
9 changed files with 734 additions and 35 deletions

View File

@ -536,6 +536,9 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
EndCopyFrom(copyState); EndCopyFrom(copyState);
heap_close(distributedRelation, NoLock); heap_close(distributedRelation, NoLock);
/* mark failed placements as inactive */
CheckForFailedPlacements(true, CoordinatedTransactionUses2PC);
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
if (completionTag != NULL) if (completionTag != NULL)
@ -834,7 +837,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
bool stopOnFailure, bool useBinaryCopyFormat) bool stopOnFailure, bool useBinaryCopyFormat)
{ {
List *finalizedPlacementList = NIL; List *finalizedPlacementList = NIL;
List *failedPlacementList = NIL; int failedPlacementCount = 0;
ListCell *placementCell = NULL; ListCell *placementCell = NULL;
List *connectionList = NULL; List *connectionList = NULL;
int64 shardId = shardConnections->shardId; int64 shardId = shardConnections->shardId;
@ -863,8 +866,6 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
foreach(placementCell, finalizedPlacementList) foreach(placementCell, finalizedPlacementList)
{ {
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
char *nodeName = placement->nodeName;
int nodePort = placement->nodePort;
char *nodeUser = CurrentUserName(); char *nodeUser = CurrentUserName();
MultiConnection *connection = NULL; MultiConnection *connection = NULL;
uint32 connectionFlags = FOR_DML; uint32 connectionFlags = FOR_DML;
@ -877,23 +878,24 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
{ {
if (stopOnFailure) if (stopOnFailure)
{ {
ereport(ERROR, (errmsg("could not open connection to %s:%d", ReportConnectionError(connection, ERROR);
nodeName, nodePort)));
} }
else
{
ReportConnectionError(connection, WARNING);
MarkRemoteTransactionFailed(connection, true);
failedPlacementList = lappend(failedPlacementList, placement); failedPlacementCount++;
continue; continue;
}
} }
/* /*
* If errors are supposed to cause immediate aborts (i.e. we don't * Errors are supposed to cause immediate aborts (i.e. we don't
* want to/can't invalidate placements), mark the connection as * want to/can't invalidate placements), mark the connection as
* critical so later errors cause failures. * critical so later errors cause failures.
*/ */
if (stopOnFailure) MarkRemoteTransactionCritical(connection);
{
MarkRemoteTransactionCritical(connection);
}
ClaimConnectionExclusively(connection); ClaimConnectionExclusively(connection);
RemoteTransactionBeginIfNecessary(connection); RemoteTransactionBeginIfNecessary(connection);
copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId, copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId,
@ -902,14 +904,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
if (PQresultStatus(result) != PGRES_COPY_IN) if (PQresultStatus(result) != PGRES_COPY_IN)
{ {
ReportConnectionError(connection, WARNING); ReportResultError(connection, result, ERROR);
MarkRemoteTransactionFailed(connection, true);
PQclear(result);
/* failed placements will be invalidated by transaction machinery */
failedPlacementList = lappend(failedPlacementList, placement);
continue;
} }
PQclear(result); PQclear(result);
@ -917,9 +912,9 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
} }
/* if all placements failed, error out */ /* if all placements failed, error out */
if (list_length(failedPlacementList) == list_length(finalizedPlacementList)) if (failedPlacementCount == list_length(finalizedPlacementList))
{ {
ereport(ERROR, (errmsg("could not find any active placements"))); ereport(ERROR, (errmsg("could not connect to any active placements")));
} }
/* /*
@ -927,7 +922,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
* never reach to this point. This is the case for reference tables and * never reach to this point. This is the case for reference tables and
* copy from worker nodes. * copy from worker nodes.
*/ */
Assert(!stopOnFailure || list_length(failedPlacementList) == 0); Assert(!stopOnFailure || failedPlacementCount == 0);
shardConnections->connectionList = connectionList; shardConnections->connectionList = connectionList;

View File

@ -1324,7 +1324,8 @@ NOTICE: Replicating reference table "mx_ref" to all workers
SELECT shardid, nodename, nodeport SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass; WHERE logicalrelid='mx_ref'::regclass
ORDER BY shardid, nodeport;
shardid | nodename | nodeport shardid | nodename | nodeport
---------+-----------+---------- ---------+-----------+----------
1310184 | localhost | 57637 1310184 | localhost | 57637
@ -1334,7 +1335,8 @@ WHERE logicalrelid='mx_ref'::regclass;
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT shardid, nodename, nodeport SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass; WHERE logicalrelid='mx_ref'::regclass
ORDER BY shardid, nodeport;
shardid | nodename | nodeport shardid | nodename | nodeport
---------+-----------+---------- ---------+-----------+----------
1310184 | localhost | 57637 1310184 | localhost | 57637

View File

@ -337,9 +337,9 @@ DELETE FROM researchers WHERE lab_id = 6;
\copy researchers FROM STDIN delimiter ',' \copy researchers FROM STDIN delimiter ','
COMMIT; COMMIT;
WARNING: illegal value WARNING: illegal value
WARNING: failed to commit transaction on localhost:57638 WARNING: failed to commit critical transaction on localhost:57638, metadata is likely out of sync
WARNING: illegal value WARNING: illegal value
WARNING: failed to commit transaction on localhost:57637 WARNING: failed to commit critical transaction on localhost:57637, metadata is likely out of sync
WARNING: could not commit transaction for shard 1200001 on any active node WARNING: could not commit transaction for shard 1200001 on any active node
ERROR: could not commit transaction on any active node ERROR: could not commit transaction on any active node
\unset VERBOSITY \unset VERBOSITY
@ -1127,6 +1127,15 @@ SELECT create_reference_table('reference_failure_test');
(1 row) (1 row)
-- create a hash distributed table
SET citus.shard_count TO 4;
CREATE TABLE numbers_hash_failure_test(key int, value int);
SELECT create_distributed_table('numbers_hash_failure_test', 'key');
create_distributed_table
--------------------------
(1 row)
-- ensure that the shard is created for this user -- ensure that the shard is created for this user
\c - test_user - :worker_1_port \c - test_user - :worker_1_port
\dt reference_failure_test_1200015 \dt reference_failure_test_1200015
@ -1152,6 +1161,10 @@ INSERT INTO reference_failure_test VALUES (1, '1');
WARNING: connection error: localhost:57637 WARNING: connection error: localhost:57637
ERROR: failure on connection marked as essential: localhost:57637 ERROR: failure on connection marked as essential: localhost:57637
COMMIT; COMMIT;
BEGIN;
COPY reference_failure_test FROM STDIN WITH (FORMAT 'csv');
ERROR: connection error: localhost:57637
COMMIT;
-- show that no data go through the table and shard states are good -- show that no data go through the table and shard states are good
SELECT * FROM reference_failure_test; SELECT * FROM reference_failure_test;
key | value key | value
@ -1171,9 +1184,114 @@ ORDER BY s.logicalrelid, sp.shardstate;
reference_failure_test | 1 | 2 reference_failure_test | 1 | 2
(1 row) (1 row)
BEGIN;
COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv');
WARNING: connection error: localhost:57637
WARNING: connection error: localhost:57637
-- some placements are invalid before abort
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
1200016 | 3 | localhost | 57637
1200016 | 1 | localhost | 57638
1200017 | 1 | localhost | 57637
1200017 | 1 | localhost | 57638
1200018 | 1 | localhost | 57637
1200018 | 1 | localhost | 57638
1200019 | 3 | localhost | 57637
1200019 | 1 | localhost | 57638
(8 rows)
ABORT;
-- verify nothing is inserted
SELECT count(*) FROM numbers_hash_failure_test;
WARNING: connection error: localhost:57637
WARNING: connection error: localhost:57637
count
-------
0
(1 row)
-- all placements to be market valid
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
1200016 | 1 | localhost | 57637
1200016 | 1 | localhost | 57638
1200017 | 1 | localhost | 57637
1200017 | 1 | localhost | 57638
1200018 | 1 | localhost | 57637
1200018 | 1 | localhost | 57638
1200019 | 1 | localhost | 57637
1200019 | 1 | localhost | 57638
(8 rows)
BEGIN;
COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv');
WARNING: connection error: localhost:57637
WARNING: connection error: localhost:57637
-- check shard states before commit
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
1200016 | 3 | localhost | 57637
1200016 | 1 | localhost | 57638
1200017 | 1 | localhost | 57637
1200017 | 1 | localhost | 57638
1200018 | 1 | localhost | 57637
1200018 | 1 | localhost | 57638
1200019 | 3 | localhost | 57637
1200019 | 1 | localhost | 57638
(8 rows)
COMMIT;
-- expect some placements to be market invalid after commit
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
1200016 | 3 | localhost | 57637
1200016 | 1 | localhost | 57638
1200017 | 1 | localhost | 57637
1200017 | 1 | localhost | 57638
1200018 | 1 | localhost | 57637
1200018 | 1 | localhost | 57638
1200019 | 3 | localhost | 57637
1200019 | 1 | localhost | 57638
(8 rows)
-- verify data is inserted
SELECT count(*) FROM numbers_hash_failure_test;
WARNING: connection error: localhost:57637
WARNING: connection error: localhost:57637
count
-------
2
(1 row)
-- connect back to the worker and set rename the test_user back -- connect back to the worker and set rename the test_user back
\c - :default_user - :worker_1_port \c - :default_user - :worker_1_port
ALTER USER test_user_new RENAME TO test_user; ALTER USER test_user_new RENAME TO test_user;
-- connect back to the master with the proper user to continue the tests -- connect back to the master with the proper user to continue the tests
\c - :default_user - :master_port \c - :default_user - :master_port
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second; DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second,
reference_failure_test, numbers_hash_failure_test;
SELECT * FROM run_command_on_workers('DROP USER test_user');
nodename | nodeport | success | result
-----------+----------+---------+-----------
localhost | 57637 | t | DROP ROLE
localhost | 57638 | t | DROP ROLE
(2 rows)
DROP USER test_user;

View File

@ -777,14 +777,15 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-------------+-----------+----------
1380001 | 1 | 0 | localhost | 57638 1380001 | 1 | 0 | localhost | 57638
1380002 | 1 | 0 | localhost | 57638 1380002 | 1 | 0 | localhost | 57638
(2 rows) (2 rows)
\c - - - :master_port \c - - - :master_port
SELECT master_remove_node('localhost', :worker_2_port); SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node master_remove_node

View File

@ -538,3 +538,214 @@ SELECT master_create_distributed_table('composite_partition_column_table', 'comp
1,"(1,1)" 1,"(1,1)"
2,"(2,2)" 2,"(2,2)"
\. \.
-- Test copy on append distributed tables do not create shards on removed workers
CREATE TABLE numbers_append (a int, b int);
SELECT master_create_distributed_table('numbers_append', 'a', 'append');
-- no shards is created yet
SELECT shardid, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
1,1
2,2
\.
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
3,5
4,6
\.
-- verify there are shards at both workers
SELECT shardid, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
-- disable the first node
SELECT master_disable_node('localhost', :worker_1_port);
-- set replication factor to 1 so that copy will
-- succeed without replication count error
SET citus.shard_replication_factor TO 1;
-- add two new shards and verify they are created at the other node
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
5,7
6,8
\.
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
7,9
8,10
\.
SELECT shardid, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
-- add the node back
SELECT master_add_node('localhost', :worker_1_port);
RESET citus.shard_replication_factor;
-- add two new shards and verify they are created at both workers
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
9,11
10,12
\.
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
11,13
12,14
\.
SELECT shardid, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
DROP TABLE numbers_append;
-- Test copy failures against connection failures
-- create and switch to test user
CREATE USER test_user;
SELECT * FROM run_command_on_workers('CREATE USER test_user');
\c - test_user
SET citus.shard_count to 4;
CREATE TABLE numbers_hash (a int, b int);
SELECT create_distributed_table('numbers_hash', 'a');
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
1,1
2,2
3,3
4,4
5,5
6,6
7,7
8,8
\.
-- verify each placement is active
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_hash'::regclass order by placementid;
-- create a reference table
CREATE TABLE numbers_reference(a int, b int);
SELECT create_reference_table('numbers_reference');
COPY numbers_reference FROM STDIN WITH (FORMAT 'csv');
1,1
2,2
\.
-- create another hash distributed table
CREATE TABLE numbers_hash_other(a int, b int);
SELECT create_distributed_table('numbers_hash_other', 'a');
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_hash_other'::regclass order by placementid;
-- manually corrupt pg_dist_shard such that both copies of one shard is placed in
-- worker_1. This is to test the behavior when no replica of a shard is accessible.
-- Whole copy operation is supposed to fail and rollback.
\c - :default_user
UPDATE pg_dist_shard_placement SET nodeport = :worker_1_port WHERE shardid = 560176;
-- disable test_user on the first worker
\c - :default_user - :worker_1_port
ALTER USER test_user WITH nologin;
\c - test_user - :master_port
-- reissue copy
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
1,1
2,2
3,3
4,4
5,5
6,6
7,7
8,8
\.
-- verify shards in the first worker as marked invalid
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_hash'::regclass order by placementid;
-- try to insert into a reference table copy should fail
COPY numbers_reference FROM STDIN WITH (FORMAT 'csv');
3,1
4,2
\.
-- verify shards for reference table are still valid
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_reference'::regclass order by placementid;
-- try to insert into numbers_hash_other. copy should fail and rollback
-- since it can not insert into either copies of a shard. shards are expected to
-- stay valid since the operation is rolled back.
COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv');
1,1
2,2
3,3
\.
-- verify shards for numbers_hash_other are still valid
-- since copy has failed altogether
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_hash_other'::regclass order by placementid;
-- re-enable test_user on the first worker
\c - :default_user - :worker_1_port
ALTER USER test_user WITH login;
-- there is a dangling shard in worker_2, drop it
\c - test_user - :worker_2_port
DROP TABLE numbers_hash_other_560176;
\c - test_user - :master_port
DROP TABLE numbers_hash;
DROP TABLE numbers_hash_other;
DROP TABLE numbers_reference;
\c - :default_user
-- test copy failure inside the node
-- it will be done by changing definition of a shard table
SET citus.shard_count to 4;
CREATE TABLE numbers_hash(a int, b int);
SELECT create_distributed_table('numbers_hash', 'a');
\c - - - :worker_1_port
ALTER TABLE numbers_hash_560180 ADD COLUMN c int;
\c - - - :master_port
-- operation will fail to modify a shard and roll back
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
1,1
2,2
3,3
4,4
5,5
6,6
7,7
8,8
\.
-- verify no row is inserted
SELECT * FROM numbers_hash;
-- verify shard is still marked as valid
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_hash'::regclass order by placementid;
DROP TABLE numbers_hash;
SELECT * FROM run_command_on_workers('DROP USER test_user');
DROP USER test_user;

View File

@ -708,3 +708,310 @@ CONTEXT: while executing command on localhost:57638
WARNING: could not get statistics for shard public.composite_partition_column_table_560164 WARNING: could not get statistics for shard public.composite_partition_column_table_560164
DETAIL: Setting shard statistics to NULL DETAIL: Setting shard statistics to NULL
ERROR: failure on connection marked as essential: localhost:57637 ERROR: failure on connection marked as essential: localhost:57637
-- Test copy on append distributed tables do not create shards on removed workers
CREATE TABLE numbers_append (a int, b int);
SELECT master_create_distributed_table('numbers_append', 'a', 'append');
master_create_distributed_table
---------------------------------
(1 row)
-- no shards is created yet
SELECT shardid, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
shardid | nodename | nodeport
---------+----------+----------
(0 rows)
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
-- verify there are shards at both workers
SELECT shardid, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
shardid | nodename | nodeport
---------+-----------+----------
560165 | localhost | 57637
560165 | localhost | 57638
560166 | localhost | 57638
560166 | localhost | 57637
(4 rows)
-- disable the first node
SELECT master_disable_node('localhost', :worker_1_port);
NOTICE: Node localhost:57637 has active shard placements. Some queries may fail after this operation. Use select master_add_node('localhost', 57637) to add this node back.
master_disable_node
---------------------
(1 row)
-- set replication factor to 1 so that copy will
-- succeed without replication count error
SET citus.shard_replication_factor TO 1;
-- add two new shards and verify they are created at the other node
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
SELECT shardid, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
shardid | nodename | nodeport
---------+-----------+----------
560165 | localhost | 57637
560165 | localhost | 57638
560166 | localhost | 57638
560166 | localhost | 57637
560167 | localhost | 57638
560168 | localhost | 57638
(6 rows)
-- add the node back
SELECT master_add_node('localhost', :worker_1_port);
NOTICE: Replicating reference table "nation" to all workers
NOTICE: Replicating reference table "supplier" to all workers
master_add_node
---------------------------------
(3,3,localhost,57637,default,f)
(1 row)
RESET citus.shard_replication_factor;
-- add two new shards and verify they are created at both workers
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
SELECT shardid, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
shardid | nodename | nodeport
---------+-----------+----------
560165 | localhost | 57637
560165 | localhost | 57638
560166 | localhost | 57638
560166 | localhost | 57637
560167 | localhost | 57638
560168 | localhost | 57638
560169 | localhost | 57637
560169 | localhost | 57638
560170 | localhost | 57638
560170 | localhost | 57637
(10 rows)
DROP TABLE numbers_append;
-- Test copy failures against connection failures
-- create and switch to test user
CREATE USER test_user;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
SELECT * FROM run_command_on_workers('CREATE USER test_user');
nodename | nodeport | success | result
-----------+----------+---------+-------------
localhost | 57637 | t | CREATE ROLE
localhost | 57638 | t | CREATE ROLE
(2 rows)
\c - test_user
SET citus.shard_count to 4;
CREATE TABLE numbers_hash (a int, b int);
SELECT create_distributed_table('numbers_hash', 'a');
create_distributed_table
--------------------------
(1 row)
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
-- verify each placement is active
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_hash'::regclass order by placementid;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
560171 | 1 | localhost | 57637
560171 | 1 | localhost | 57638
560172 | 1 | localhost | 57638
560172 | 1 | localhost | 57637
560173 | 1 | localhost | 57637
560173 | 1 | localhost | 57638
560174 | 1 | localhost | 57638
560174 | 1 | localhost | 57637
(8 rows)
-- create a reference table
CREATE TABLE numbers_reference(a int, b int);
SELECT create_reference_table('numbers_reference');
create_reference_table
------------------------
(1 row)
COPY numbers_reference FROM STDIN WITH (FORMAT 'csv');
-- create another hash distributed table
CREATE TABLE numbers_hash_other(a int, b int);
SELECT create_distributed_table('numbers_hash_other', 'a');
create_distributed_table
--------------------------
(1 row)
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_hash_other'::regclass order by placementid;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
560176 | 1 | localhost | 57638
560176 | 1 | localhost | 57637
560177 | 1 | localhost | 57637
560177 | 1 | localhost | 57638
560178 | 1 | localhost | 57638
560178 | 1 | localhost | 57637
560179 | 1 | localhost | 57637
560179 | 1 | localhost | 57638
(8 rows)
-- manually corrupt pg_dist_shard such that both copies of one shard is placed in
-- worker_1. This is to test the behavior when no replica of a shard is accessible.
-- Whole copy operation is supposed to fail and rollback.
\c - :default_user
UPDATE pg_dist_shard_placement SET nodeport = :worker_1_port WHERE shardid = 560176;
-- disable test_user on the first worker
\c - :default_user - :worker_1_port
ALTER USER test_user WITH nologin;
\c - test_user - :master_port
-- reissue copy
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 1: "1,1"
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 2: "2,2"
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 3: "3,3"
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 6: "6,6"
-- verify shards in the first worker as marked invalid
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_hash'::regclass order by placementid;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
560171 | 3 | localhost | 57637
560171 | 1 | localhost | 57638
560172 | 1 | localhost | 57638
560172 | 3 | localhost | 57637
560173 | 3 | localhost | 57637
560173 | 1 | localhost | 57638
560174 | 1 | localhost | 57638
560174 | 3 | localhost | 57637
(8 rows)
-- try to insert into a reference table copy should fail
COPY numbers_reference FROM STDIN WITH (FORMAT 'csv');
ERROR: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_reference, line 1: "3,1"
-- verify shards for reference table are still valid
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_reference'::regclass order by placementid;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
560175 | 1 | localhost | 57637
560175 | 1 | localhost | 57638
(2 rows)
-- try to insert into numbers_hash_other. copy should fail and rollback
-- since it can not insert into either copies of a shard. shards are expected to
-- stay valid since the operation is rolled back.
COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv');
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
ERROR: could not connect to any active placements
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
-- verify shards for numbers_hash_other are still valid
-- since copy has failed altogether
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_hash_other'::regclass order by placementid;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
560176 | 1 | localhost | 57637
560176 | 1 | localhost | 57637
560177 | 1 | localhost | 57637
560177 | 1 | localhost | 57638
560178 | 1 | localhost | 57638
560178 | 1 | localhost | 57637
560179 | 1 | localhost | 57637
560179 | 1 | localhost | 57638
(8 rows)
-- re-enable test_user on the first worker
\c - :default_user - :worker_1_port
ALTER USER test_user WITH login;
-- there is a dangling shard in worker_2, drop it
\c - test_user - :worker_2_port
DROP TABLE numbers_hash_other_560176;
\c - test_user - :master_port
DROP TABLE numbers_hash;
DROP TABLE numbers_hash_other;
DROP TABLE numbers_reference;
\c - :default_user
-- test copy failure inside the node
-- it will be done by changing definition of a shard table
SET citus.shard_count to 4;
CREATE TABLE numbers_hash(a int, b int);
SELECT create_distributed_table('numbers_hash', 'a');
create_distributed_table
--------------------------
(1 row)
\c - - - :worker_1_port
ALTER TABLE numbers_hash_560180 ADD COLUMN c int;
\c - - - :master_port
-- operation will fail to modify a shard and roll back
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
ERROR: row field count is 2, expected 3
DETAIL: (null)
-- verify no row is inserted
SELECT * FROM numbers_hash;
a | b
---+---
(0 rows)
-- verify shard is still marked as valid
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_hash'::regclass order by placementid;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
560180 | 1 | localhost | 57637
560180 | 1 | localhost | 57638
560181 | 1 | localhost | 57638
560181 | 1 | localhost | 57637
560182 | 1 | localhost | 57637
560182 | 1 | localhost | 57638
560183 | 1 | localhost | 57638
560183 | 1 | localhost | 57637
(8 rows)
DROP TABLE numbers_hash;
SELECT * FROM run_command_on_workers('DROP USER test_user');
nodename | nodeport | success | result
-----------+----------+---------+-----------
localhost | 57637 | t | DROP ROLE
localhost | 57638 | t | DROP ROLE
(2 rows)
DROP USER test_user;

View File

@ -581,12 +581,14 @@ SELECT master_add_node('localhost', :worker_2_port);
SELECT shardid, nodename, nodeport SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass; WHERE logicalrelid='mx_ref'::regclass
ORDER BY shardid, nodeport;
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT shardid, nodename, nodeport SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass; WHERE logicalrelid='mx_ref'::regclass
ORDER BY shardid, nodeport;
\c - - - :master_port \c - - - :master_port
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);

View File

@ -832,6 +832,11 @@ CREATE USER test_user;
CREATE TABLE reference_failure_test (key int, value int); CREATE TABLE reference_failure_test (key int, value int);
SELECT create_reference_table('reference_failure_test'); SELECT create_reference_table('reference_failure_test');
-- create a hash distributed table
SET citus.shard_count TO 4;
CREATE TABLE numbers_hash_failure_test(key int, value int);
SELECT create_distributed_table('numbers_hash_failure_test', 'key');
-- ensure that the shard is created for this user -- ensure that the shard is created for this user
\c - test_user - :worker_1_port \c - test_user - :worker_1_port
\dt reference_failure_test_1200015 \dt reference_failure_test_1200015
@ -851,9 +856,16 @@ BEGIN;
INSERT INTO reference_failure_test VALUES (1, '1'); INSERT INTO reference_failure_test VALUES (1, '1');
COMMIT; COMMIT;
BEGIN;
COPY reference_failure_test FROM STDIN WITH (FORMAT 'csv');
2,2
\.
COMMIT;
-- show that no data go through the table and shard states are good -- show that no data go through the table and shard states are good
SELECT * FROM reference_failure_test; SELECT * FROM reference_failure_test;
-- all placements should be healthy -- all placements should be healthy
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp, FROM pg_dist_shard_placement AS sp,
@ -863,10 +875,60 @@ AND s.logicalrelid = 'reference_failure_test'::regclass
GROUP BY s.logicalrelid, sp.shardstate GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate; ORDER BY s.logicalrelid, sp.shardstate;
BEGIN;
COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv');
1,1
2,2
\.
-- some placements are invalid before abort
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport;
ABORT;
-- verify nothing is inserted
SELECT count(*) FROM numbers_hash_failure_test;
-- all placements to be market valid
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport;
BEGIN;
COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv');
1,1
2,2
\.
-- check shard states before commit
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport;
COMMIT;
-- expect some placements to be market invalid after commit
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid)
WHERE logicalrelid = 'numbers_hash_failure_test'::regclass
ORDER BY shardid, nodeport;
-- verify data is inserted
SELECT count(*) FROM numbers_hash_failure_test;
-- connect back to the worker and set rename the test_user back -- connect back to the worker and set rename the test_user back
\c - :default_user - :worker_1_port \c - :default_user - :worker_1_port
ALTER USER test_user_new RENAME TO test_user; ALTER USER test_user_new RENAME TO test_user;
-- connect back to the master with the proper user to continue the tests -- connect back to the master with the proper user to continue the tests
\c - :default_user - :master_port \c - :default_user - :master_port
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second; DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second,
reference_failure_test, numbers_hash_failure_test;
SELECT * FROM run_command_on_workers('DROP USER test_user');
DROP USER test_user;

View File

@ -467,8 +467,9 @@ SELECT
FROM FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
nodeport = :worker_2_port; nodeport = :worker_2_port
ORDER BY
shardid;
\c - - - :master_port \c - - - :master_port
SELECT master_remove_node('localhost', :worker_2_port); SELECT master_remove_node('localhost', :worker_2_port);