From d0c76407b83d651ebdd1c610e01f79b835161324 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 24 Jan 2017 12:24:01 +0100 Subject: [PATCH 1/6] Set placement to inactive on connection failure in COPY --- src/backend/distributed/commands/multi_copy.c | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 71ab02d35..c2419ba6b 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -834,7 +834,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, bool stopOnFailure, bool useBinaryCopyFormat) { List *finalizedPlacementList = NIL; - List *failedPlacementList = NIL; + int failedPlacementCount = 0; ListCell *placementCell = NULL; List *connectionList = NULL; int64 shardId = shardConnections->shardId; @@ -863,8 +863,6 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, foreach(placementCell, finalizedPlacementList) { ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); - char *nodeName = placement->nodeName; - int nodePort = placement->nodePort; char *nodeUser = CurrentUserName(); MultiConnection *connection = NULL; uint32 connectionFlags = FOR_DML; @@ -877,12 +875,16 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, { if (stopOnFailure) { - ereport(ERROR, (errmsg("could not open connection to %s:%d", - nodeName, nodePort))); + ReportConnectionError(connection, ERROR); } + else + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, true); - failedPlacementList = lappend(failedPlacementList, placement); - continue; + failedPlacementCount++; + continue; + } } /* @@ -907,8 +909,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, PQclear(result); - /* failed placements will be invalidated by transaction machinery */ - failedPlacementList = lappend(failedPlacementList, placement); + failedPlacementCount++; continue; } @@ -917,9 +918,9 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, } /* if all placements failed, error out */ - if (list_length(failedPlacementList) == list_length(finalizedPlacementList)) + if (failedPlacementCount == list_length(finalizedPlacementList)) { - ereport(ERROR, (errmsg("could not find any active placements"))); + ereport(ERROR, (errmsg("could not connect to any active placements"))); } /* @@ -927,7 +928,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, * never reach to this point. This is the case for reference tables and * copy from worker nodes. */ - Assert(!stopOnFailure || list_length(failedPlacementList) == 0); + Assert(!stopOnFailure || failedPlacementCount == 0); shardConnections->connectionList = connectionList; From b1626887d5eabbc7ac31a5503c752e3f6f5579f6 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 24 Jan 2017 12:48:52 +0100 Subject: [PATCH 2/6] Don't mark placements inactive in COPY after successful connection --- src/backend/distributed/commands/multi_copy.c | 15 +++------------ .../regress/expected/multi_modifying_xacts.out | 4 ++-- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index c2419ba6b..48de9ee86 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -888,14 +888,11 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, } /* - * If errors are supposed to cause immediate aborts (i.e. we don't + * Errors are supposed to cause immediate aborts (i.e. we don't * want to/can't invalidate placements), mark the connection as * critical so later errors cause failures. */ - if (stopOnFailure) - { - MarkRemoteTransactionCritical(connection); - } + MarkRemoteTransactionCritical(connection); ClaimConnectionExclusively(connection); RemoteTransactionBeginIfNecessary(connection); copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId, @@ -904,13 +901,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, if (PQresultStatus(result) != PGRES_COPY_IN) { - ReportConnectionError(connection, WARNING); - MarkRemoteTransactionFailed(connection, true); - - PQclear(result); - - failedPlacementCount++; - continue; + ReportResultError(connection, result, ERROR); } PQclear(result); diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index 03c443049..d421c0a5e 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -337,9 +337,9 @@ DELETE FROM researchers WHERE lab_id = 6; \copy researchers FROM STDIN delimiter ',' COMMIT; WARNING: illegal value -WARNING: failed to commit transaction on localhost:57638 +WARNING: failed to commit critical transaction on localhost:57638, metadata is likely out of sync WARNING: illegal value -WARNING: failed to commit transaction on localhost:57637 +WARNING: failed to commit critical transaction on localhost:57637, metadata is likely out of sync WARNING: could not commit transaction for shard 1200001 on any active node ERROR: could not commit transaction on any active node \unset VERBOSITY From f56454360ce4b82d6bd27a95c7d0148907ec54db Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 24 Jan 2017 12:55:15 +0100 Subject: [PATCH 3/6] Mark failed placements as inactive immediately after COPY --- src/backend/distributed/commands/multi_copy.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 48de9ee86..b69a06799 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -536,6 +536,9 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) EndCopyFrom(copyState); heap_close(distributedRelation, NoLock); + /* mark failed placements as inactive */ + CheckForFailedPlacements(true, CoordinatedTransactionUses2PC); + CHECK_FOR_INTERRUPTS(); if (completionTag != NULL) From 51941114203a6028a13294663bed3855c9bc0191 Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Wed, 25 Jan 2017 17:48:15 +0300 Subject: [PATCH 4/6] Add failure case for regression tests --- src/test/regress/input/multi_copy.source | 201 +++++++++++++++ src/test/regress/output/multi_copy.source | 287 ++++++++++++++++++++++ 2 files changed, 488 insertions(+) diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index 1708dcf12..d20c5c4c1 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -538,3 +538,204 @@ SELECT master_create_distributed_table('composite_partition_column_table', 'comp 1,"(1,1)" 2,"(2,2)" \. + + +-- Test copy on append distributed tables do not create shards on removed workers +CREATE TABLE numbers_append (a int, b int); +SELECT master_create_distributed_table('numbers_append', 'a', 'append'); + +-- no shards is created yet +SELECT shardid, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_append'::regclass order by placementid; + +COPY numbers_append FROM STDIN WITH (FORMAT 'csv'); +1,1 +2,2 +\. + +COPY numbers_append FROM STDIN WITH (FORMAT 'csv'); +3,5 +4,6 +\. + +-- verify there are shards at both workers +SELECT shardid, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_append'::regclass order by placementid; + +-- disable the first node +SELECT master_disable_node('localhost', :worker_1_port); +-- set replication factor to 1 so that copy will +-- succeed without replication count error +SET citus.shard_replication_factor TO 1; + +-- add two new shards and verify they are created at the other node +COPY numbers_append FROM STDIN WITH (FORMAT 'csv'); +5,7 +6,8 +\. + +COPY numbers_append FROM STDIN WITH (FORMAT 'csv'); +7,9 +8,10 +\. + +SELECT shardid, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_append'::regclass order by placementid; + +-- add the node back +SELECT master_add_node('localhost', :worker_1_port); +RESET citus.shard_replication_factor; +-- add two new shards and verify they are created at both workers +COPY numbers_append FROM STDIN WITH (FORMAT 'csv'); +9,11 +10,12 +\. + +COPY numbers_append FROM STDIN WITH (FORMAT 'csv'); +11,13 +12,14 +\. + +SELECT shardid, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_append'::regclass order by placementid; +DROP TABLE numbers_append; + +-- Test copy failures against connection failures +-- switch to a test user, it was previously created +\c - test_user +SET citus.shard_count to 4; +CREATE TABLE numbers_hash (a int, b int); +SELECT create_distributed_table('numbers_hash', 'a'); + +COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); +1,1 +2,2 +3,3 +4,4 +5,5 +6,6 +7,7 +8,8 +\. + +-- verify each placement is active +SELECT shardid, shardstate, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_hash'::regclass order by placementid; + +-- create a reference table +CREATE TABLE numbers_reference(a int, b int); +SELECT create_reference_table('numbers_reference'); +COPY numbers_reference FROM STDIN WITH (FORMAT 'csv'); +1,1 +2,2 +\. + +-- create another hash distributed table +CREATE TABLE numbers_hash_other(a int, b int); +SELECT create_distributed_table('numbers_hash_other', 'a'); +SELECT shardid, shardstate, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_hash_other'::regclass order by placementid; + +-- manually corrupt pg_dist_shard such that both copies of one shard is placed in +-- worker_1. This is to test the behavior when no replica of a shard is accessible. +-- Whole copy operation is supposed to fail and rollback. +\c - :default_user +UPDATE pg_dist_shard_placement SET nodeport = :worker_1_port WHERE shardid = 560176; + +-- disable test_user on the first worker +\c - :default_user - :worker_1_port +ALTER USER test_user WITH nologin; +\c - test_user - :master_port + +-- reissue copy +COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); +1,1 +2,2 +3,3 +4,4 +5,5 +6,6 +7,7 +8,8 +\. + +-- verify shards in the first worker as marked invalid +SELECT shardid, shardstate, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_hash'::regclass order by placementid; + +-- try to insert into a reference table copy should fail +COPY numbers_reference FROM STDIN WITH (FORMAT 'csv'); +3,1 +4,2 +\. + +-- verify shards for reference table are still valid +SELECT shardid, shardstate, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_reference'::regclass order by placementid; + + +-- try to insert into numbers_hash_other. copy should fail and rollback +-- since it can not insert into either copies of a shard. shards are expected to +-- stay valid since the operation is rolled back. +COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv'); +1,1 +2,2 +3,3 +\. + +-- verify shards for numbers_hash_other are still valid +-- since copy has failed altogether +SELECT shardid, shardstate, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_hash_other'::regclass order by placementid; + +-- re-enable test_user on the first worker +\c - :default_user - :worker_1_port +ALTER USER test_user WITH login; +\c - test_user - :master_port + +DROP TABLE numbers_hash; +DROP TABLE numbers_hash_other; +DROP TABLE numbers_reference; +\c - :default_user + +-- test copy failure inside the node +-- it will be done by changing definition of a shard table +SET citus.shard_count to 4; +CREATE TABLE numbers_hash(a int, b int); +SELECT create_distributed_table('numbers_hash', 'a'); + +\c - - - :worker_1_port +ALTER TABLE numbers_hash_560180 ADD COLUMN c int; +\c - - - :master_port + +-- operation will fail to modify a shard and roll back +COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); +1,1 +2,2 +3,3 +4,4 +5,5 +6,6 +7,7 +8,8 +\. + +-- verify no row is inserted +SELECT * FROM numbers_hash; + +-- verify shard is still marked as valid +SELECT shardid, shardstate, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_hash'::regclass order by placementid; + +DROP TABLE numbers_hash; + diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index 4722a83c6..5c26f6649 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -708,3 +708,290 @@ CONTEXT: while executing command on localhost:57638 WARNING: could not get statistics for shard public.composite_partition_column_table_560164 DETAIL: Setting shard statistics to NULL ERROR: failure on connection marked as essential: localhost:57637 +-- Test copy on append distributed tables do not create shards on removed workers +CREATE TABLE numbers_append (a int, b int); +SELECT master_create_distributed_table('numbers_append', 'a', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + +-- no shards is created yet +SELECT shardid, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_append'::regclass order by placementid; + shardid | nodename | nodeport +---------+----------+---------- +(0 rows) + +COPY numbers_append FROM STDIN WITH (FORMAT 'csv'); +COPY numbers_append FROM STDIN WITH (FORMAT 'csv'); +-- verify there are shards at both workers +SELECT shardid, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_append'::regclass order by placementid; + shardid | nodename | nodeport +---------+-----------+---------- + 560165 | localhost | 57637 + 560165 | localhost | 57638 + 560166 | localhost | 57638 + 560166 | localhost | 57637 +(4 rows) + +-- disable the first node +SELECT master_disable_node('localhost', :worker_1_port); +NOTICE: Node localhost:57637 has active shard placements. Some queries may fail after this operation. Use select master_add_node('localhost', 57637) to add this node back. + master_disable_node +--------------------- + +(1 row) + +-- set replication factor to 1 so that copy will +-- succeed without replication count error +SET citus.shard_replication_factor TO 1; +-- add two new shards and verify they are created at the other node +COPY numbers_append FROM STDIN WITH (FORMAT 'csv'); +COPY numbers_append FROM STDIN WITH (FORMAT 'csv'); +SELECT shardid, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_append'::regclass order by placementid; + shardid | nodename | nodeport +---------+-----------+---------- + 560165 | localhost | 57637 + 560165 | localhost | 57638 + 560166 | localhost | 57638 + 560166 | localhost | 57637 + 560167 | localhost | 57638 + 560168 | localhost | 57638 +(6 rows) + +-- add the node back +SELECT master_add_node('localhost', :worker_1_port); +NOTICE: Replicating reference table "nation" to all workers +NOTICE: Replicating reference table "supplier" to all workers +NOTICE: Replicating reference table "reference_failure_test" to all workers + master_add_node +--------------------------------- + (3,3,localhost,57637,default,f) +(1 row) + +RESET citus.shard_replication_factor; +-- add two new shards and verify they are created at both workers +COPY numbers_append FROM STDIN WITH (FORMAT 'csv'); +COPY numbers_append FROM STDIN WITH (FORMAT 'csv'); +SELECT shardid, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_append'::regclass order by placementid; + shardid | nodename | nodeport +---------+-----------+---------- + 560165 | localhost | 57637 + 560165 | localhost | 57638 + 560166 | localhost | 57638 + 560166 | localhost | 57637 + 560167 | localhost | 57638 + 560168 | localhost | 57638 + 560169 | localhost | 57637 + 560169 | localhost | 57638 + 560170 | localhost | 57638 + 560170 | localhost | 57637 +(10 rows) + +DROP TABLE numbers_append; +-- Test copy failures against connection failures +-- switch to a test user, it was previously created +\c - test_user +SET citus.shard_count to 4; +CREATE TABLE numbers_hash (a int, b int); +SELECT create_distributed_table('numbers_hash', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); +-- verify each placement is active +SELECT shardid, shardstate, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_hash'::regclass order by placementid; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 560171 | 1 | localhost | 57637 + 560171 | 1 | localhost | 57638 + 560172 | 1 | localhost | 57638 + 560172 | 1 | localhost | 57637 + 560173 | 1 | localhost | 57637 + 560173 | 1 | localhost | 57638 + 560174 | 1 | localhost | 57638 + 560174 | 1 | localhost | 57637 +(8 rows) + +-- create a reference table +CREATE TABLE numbers_reference(a int, b int); +SELECT create_reference_table('numbers_reference'); + create_reference_table +------------------------ + +(1 row) + +COPY numbers_reference FROM STDIN WITH (FORMAT 'csv'); +-- create another hash distributed table +CREATE TABLE numbers_hash_other(a int, b int); +SELECT create_distributed_table('numbers_hash_other', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT shardid, shardstate, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_hash_other'::regclass order by placementid; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 560176 | 1 | localhost | 57638 + 560176 | 1 | localhost | 57637 + 560177 | 1 | localhost | 57637 + 560177 | 1 | localhost | 57638 + 560178 | 1 | localhost | 57638 + 560178 | 1 | localhost | 57637 + 560179 | 1 | localhost | 57637 + 560179 | 1 | localhost | 57638 +(8 rows) + +-- manually corrupt pg_dist_shard such that both copies of one shard is placed in +-- worker_1. This is to test the behavior when no replica of a shard is accessible. +-- Whole copy operation is supposed to fail and rollback. +\c - :default_user +UPDATE pg_dist_shard_placement SET nodeport = :worker_1_port WHERE shardid = 560176; +-- disable test_user on the first worker +\c - :default_user - :worker_1_port +ALTER USER test_user WITH nologin; +\c - test_user - :master_port +-- reissue copy +COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); +WARNING: connection error: localhost:57637 +DETAIL: FATAL: role "test_user" is not permitted to log in + +CONTEXT: COPY numbers_hash, line 1: "1,1" +WARNING: connection error: localhost:57637 +DETAIL: FATAL: role "test_user" is not permitted to log in + +CONTEXT: COPY numbers_hash, line 2: "2,2" +WARNING: connection error: localhost:57637 +DETAIL: FATAL: role "test_user" is not permitted to log in + +CONTEXT: COPY numbers_hash, line 3: "3,3" +WARNING: connection error: localhost:57637 +DETAIL: FATAL: role "test_user" is not permitted to log in + +CONTEXT: COPY numbers_hash, line 6: "6,6" +-- verify shards in the first worker as marked invalid +SELECT shardid, shardstate, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_hash'::regclass order by placementid; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 560171 | 3 | localhost | 57637 + 560171 | 1 | localhost | 57638 + 560172 | 1 | localhost | 57638 + 560172 | 3 | localhost | 57637 + 560173 | 3 | localhost | 57637 + 560173 | 1 | localhost | 57638 + 560174 | 1 | localhost | 57638 + 560174 | 3 | localhost | 57637 +(8 rows) + +-- try to insert into a reference table copy should fail +COPY numbers_reference FROM STDIN WITH (FORMAT 'csv'); +ERROR: connection error: localhost:57637 +DETAIL: FATAL: role "test_user" is not permitted to log in + +CONTEXT: COPY numbers_reference, line 1: "3,1" +-- verify shards for reference table are still valid +SELECT shardid, shardstate, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_reference'::regclass order by placementid; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 560175 | 1 | localhost | 57637 + 560175 | 1 | localhost | 57638 +(2 rows) + +-- try to insert into numbers_hash_other. copy should fail and rollback +-- since it can not insert into either copies of a shard. shards are expected to +-- stay valid since the operation is rolled back. +COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv'); +WARNING: connection error: localhost:57637 +DETAIL: FATAL: role "test_user" is not permitted to log in + +CONTEXT: COPY numbers_hash_other, line 1: "1,1" +WARNING: connection error: localhost:57637 +DETAIL: FATAL: role "test_user" is not permitted to log in + +CONTEXT: COPY numbers_hash_other, line 1: "1,1" +ERROR: could not connect to any active placements +CONTEXT: COPY numbers_hash_other, line 1: "1,1" +-- verify shards for numbers_hash_other are still valid +-- since copy has failed altogether +SELECT shardid, shardstate, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_hash_other'::regclass order by placementid; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 560176 | 1 | localhost | 57637 + 560176 | 1 | localhost | 57637 + 560177 | 1 | localhost | 57637 + 560177 | 1 | localhost | 57638 + 560178 | 1 | localhost | 57638 + 560178 | 1 | localhost | 57637 + 560179 | 1 | localhost | 57637 + 560179 | 1 | localhost | 57638 +(8 rows) + +-- re-enable test_user on the first worker +\c - :default_user - :worker_1_port +ALTER USER test_user WITH login; +\c - test_user - :master_port +DROP TABLE numbers_hash; +DROP TABLE numbers_hash_other; +DROP TABLE numbers_reference; +\c - :default_user +-- test copy failure inside the node +-- it will be done by changing definition of a shard table +SET citus.shard_count to 4; +CREATE TABLE numbers_hash(a int, b int); +SELECT create_distributed_table('numbers_hash', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +\c - - - :worker_1_port +ALTER TABLE numbers_hash_560180 ADD COLUMN c int; +\c - - - :master_port +-- operation will fail to modify a shard and roll back +COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); +ERROR: row field count is 2, expected 3 +DETAIL: (null) +-- verify no row is inserted +SELECT * FROM numbers_hash; + a | b +---+--- +(0 rows) + +-- verify shard is still marked as valid +SELECT shardid, shardstate, nodename, nodeport + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + WHERE logicalrelid = 'numbers_hash'::regclass order by placementid; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 560180 | 1 | localhost | 57637 + 560180 | 1 | localhost | 57638 + 560181 | 1 | localhost | 57638 + 560181 | 1 | localhost | 57637 + 560182 | 1 | localhost | 57637 + 560182 | 1 | localhost | 57638 + 560183 | 1 | localhost | 57638 + 560183 | 1 | localhost | 57637 +(8 rows) + +DROP TABLE numbers_hash; From 1107439ade5a797402ffaad26b9d725ec525f769 Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Wed, 25 Jan 2017 19:08:33 +0300 Subject: [PATCH 5/6] Fix dependent tests --- .../regress/expected/multi_metadata_sync.out | 6 +++-- .../expected/multi_modifying_xacts.out | 10 +++++++- .../multi_remove_node_reference_table.out | 5 ++-- src/test/regress/input/multi_copy.source | 14 +++++++++-- src/test/regress/output/multi_copy.source | 24 +++++++++++++++++-- src/test/regress/sql/multi_metadata_sync.sql | 6 +++-- .../regress/sql/multi_modifying_xacts.sql | 5 +++- .../sql/multi_remove_node_reference_table.sql | 5 ++-- 8 files changed, 61 insertions(+), 14 deletions(-) diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 089193c52..b7f2c2a39 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -1324,7 +1324,8 @@ NOTICE: Replicating reference table "mx_ref" to all workers SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement -WHERE logicalrelid='mx_ref'::regclass; +WHERE logicalrelid='mx_ref'::regclass +ORDER BY shardid, nodeport; shardid | nodename | nodeport ---------+-----------+---------- 1310184 | localhost | 57637 @@ -1334,7 +1335,8 @@ WHERE logicalrelid='mx_ref'::regclass; \c - - - :worker_1_port SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement -WHERE logicalrelid='mx_ref'::regclass; +WHERE logicalrelid='mx_ref'::regclass +ORDER BY shardid, nodeport; shardid | nodename | nodeport ---------+-----------+---------- 1310184 | localhost | 57637 diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index d421c0a5e..662007975 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -1176,4 +1176,12 @@ ORDER BY s.logicalrelid, sp.shardstate; ALTER USER test_user_new RENAME TO test_user; -- connect back to the master with the proper user to continue the tests \c - :default_user - :master_port -DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second; +DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second, reference_failure_test; +SELECT * FROM run_command_on_workers('DROP USER test_user'); + nodename | nodeport | success | result +-----------+----------+---------+----------- + localhost | 57637 | t | DROP ROLE + localhost | 57638 | t | DROP ROLE +(2 rows) + +DROP USER test_user; diff --git a/src/test/regress/expected/multi_remove_node_reference_table.out b/src/test/regress/expected/multi_remove_node_reference_table.out index 8446905e7..0f6934fe4 100644 --- a/src/test/regress/expected/multi_remove_node_reference_table.out +++ b/src/test/regress/expected/multi_remove_node_reference_table.out @@ -777,14 +777,15 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1380001 | 1 | 0 | localhost | 57638 1380002 | 1 | 0 | localhost | 57638 (2 rows) - \c - - - :master_port SELECT master_remove_node('localhost', :worker_2_port); master_remove_node diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index d20c5c4c1..16703c580 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -605,7 +605,10 @@ SELECT shardid, nodename, nodeport DROP TABLE numbers_append; -- Test copy failures against connection failures --- switch to a test user, it was previously created +-- create and switch to test user +CREATE USER test_user; +SELECT * FROM run_command_on_workers('CREATE USER test_user'); + \c - test_user SET citus.shard_count to 4; CREATE TABLE numbers_hash (a int, b int); @@ -700,11 +703,17 @@ SELECT shardid, shardstate, nodename, nodeport -- re-enable test_user on the first worker \c - :default_user - :worker_1_port ALTER USER test_user WITH login; + +-- there is a dangling shard in worker_2, drop it +\c - test_user - :worker_2_port +DROP TABLE numbers_hash_other_560176; + \c - test_user - :master_port DROP TABLE numbers_hash; DROP TABLE numbers_hash_other; DROP TABLE numbers_reference; + \c - :default_user -- test copy failure inside the node @@ -738,4 +747,5 @@ SELECT shardid, shardstate, nodename, nodeport WHERE logicalrelid = 'numbers_hash'::regclass order by placementid; DROP TABLE numbers_hash; - +SELECT * FROM run_command_on_workers('DROP USER test_user'); +DROP USER test_user; diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index 5c26f6649..9b02124f6 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -769,7 +769,6 @@ SELECT shardid, nodename, nodeport SELECT master_add_node('localhost', :worker_1_port); NOTICE: Replicating reference table "nation" to all workers NOTICE: Replicating reference table "supplier" to all workers -NOTICE: Replicating reference table "reference_failure_test" to all workers master_add_node --------------------------------- (3,3,localhost,57637,default,f) @@ -798,7 +797,17 @@ SELECT shardid, nodename, nodeport DROP TABLE numbers_append; -- Test copy failures against connection failures --- switch to a test user, it was previously created +-- create and switch to test user +CREATE USER test_user; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles. +SELECT * FROM run_command_on_workers('CREATE USER test_user'); + nodename | nodeport | success | result +-----------+----------+---------+------------- + localhost | 57637 | t | CREATE ROLE + localhost | 57638 | t | CREATE ROLE +(2 rows) + \c - test_user SET citus.shard_count to 4; CREATE TABLE numbers_hash (a int, b int); @@ -950,6 +959,9 @@ SELECT shardid, shardstate, nodename, nodeport -- re-enable test_user on the first worker \c - :default_user - :worker_1_port ALTER USER test_user WITH login; +-- there is a dangling shard in worker_2, drop it +\c - test_user - :worker_2_port +DROP TABLE numbers_hash_other_560176; \c - test_user - :master_port DROP TABLE numbers_hash; DROP TABLE numbers_hash_other; @@ -995,3 +1007,11 @@ SELECT shardid, shardstate, nodename, nodeport (8 rows) DROP TABLE numbers_hash; +SELECT * FROM run_command_on_workers('DROP USER test_user'); + nodename | nodeport | success | result +-----------+----------+---------+----------- + localhost | 57637 | t | DROP ROLE + localhost | 57638 | t | DROP ROLE +(2 rows) + +DROP USER test_user; diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index 3d431abc2..adb6fa7fe 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -581,12 +581,14 @@ SELECT master_add_node('localhost', :worker_2_port); SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement -WHERE logicalrelid='mx_ref'::regclass; +WHERE logicalrelid='mx_ref'::regclass +ORDER BY shardid, nodeport; \c - - - :worker_1_port SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement -WHERE logicalrelid='mx_ref'::regclass; +WHERE logicalrelid='mx_ref'::regclass +ORDER BY shardid, nodeport; \c - - - :master_port INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index 019ade9cc..08d9f46ad 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -869,4 +869,7 @@ ALTER USER test_user_new RENAME TO test_user; -- connect back to the master with the proper user to continue the tests \c - :default_user - :master_port -DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second; +DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second, reference_failure_test; + +SELECT * FROM run_command_on_workers('DROP USER test_user'); +DROP USER test_user; \ No newline at end of file diff --git a/src/test/regress/sql/multi_remove_node_reference_table.sql b/src/test/regress/sql/multi_remove_node_reference_table.sql index 9c9769bde..fe44a50da 100644 --- a/src/test/regress/sql/multi_remove_node_reference_table.sql +++ b/src/test/regress/sql/multi_remove_node_reference_table.sql @@ -467,8 +467,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; - + nodeport = :worker_2_port +ORDER BY + shardid; \c - - - :master_port SELECT master_remove_node('localhost', :worker_2_port); From 0e635b69f0a6e02804f2606cd74f7dc3ac0b0cfc Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Thu, 26 Jan 2017 10:52:23 +0300 Subject: [PATCH 6/6] Add copy failure tests inside transactions --- .../expected/multi_modifying_xacts.out | 112 +++++++++++++++++- .../regress/sql/multi_modifying_xacts.sql | 61 +++++++++- 2 files changed, 171 insertions(+), 2 deletions(-) diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index 662007975..8d934ce52 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -1127,6 +1127,15 @@ SELECT create_reference_table('reference_failure_test'); (1 row) +-- create a hash distributed table +SET citus.shard_count TO 4; +CREATE TABLE numbers_hash_failure_test(key int, value int); +SELECT create_distributed_table('numbers_hash_failure_test', 'key'); + create_distributed_table +-------------------------- + +(1 row) + -- ensure that the shard is created for this user \c - test_user - :worker_1_port \dt reference_failure_test_1200015 @@ -1152,6 +1161,10 @@ INSERT INTO reference_failure_test VALUES (1, '1'); WARNING: connection error: localhost:57637 ERROR: failure on connection marked as essential: localhost:57637 COMMIT; +BEGIN; +COPY reference_failure_test FROM STDIN WITH (FORMAT 'csv'); +ERROR: connection error: localhost:57637 +COMMIT; -- show that no data go through the table and shard states are good SELECT * FROM reference_failure_test; key | value @@ -1171,12 +1184,109 @@ ORDER BY s.logicalrelid, sp.shardstate; reference_failure_test | 1 | 2 (1 row) +BEGIN; +COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv'); +WARNING: connection error: localhost:57637 +WARNING: connection error: localhost:57637 +-- some placements are invalid before abort +SELECT shardid, shardstate, nodename, nodeport +FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) +WHERE logicalrelid = 'numbers_hash_failure_test'::regclass +ORDER BY shardid, nodeport; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 1200016 | 3 | localhost | 57637 + 1200016 | 1 | localhost | 57638 + 1200017 | 1 | localhost | 57637 + 1200017 | 1 | localhost | 57638 + 1200018 | 1 | localhost | 57637 + 1200018 | 1 | localhost | 57638 + 1200019 | 3 | localhost | 57637 + 1200019 | 1 | localhost | 57638 +(8 rows) + +ABORT; +-- verify nothing is inserted +SELECT count(*) FROM numbers_hash_failure_test; +WARNING: connection error: localhost:57637 +WARNING: connection error: localhost:57637 + count +------- + 0 +(1 row) + +-- all placements to be market valid +SELECT shardid, shardstate, nodename, nodeport +FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) +WHERE logicalrelid = 'numbers_hash_failure_test'::regclass +ORDER BY shardid, nodeport; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 1200016 | 1 | localhost | 57637 + 1200016 | 1 | localhost | 57638 + 1200017 | 1 | localhost | 57637 + 1200017 | 1 | localhost | 57638 + 1200018 | 1 | localhost | 57637 + 1200018 | 1 | localhost | 57638 + 1200019 | 1 | localhost | 57637 + 1200019 | 1 | localhost | 57638 +(8 rows) + +BEGIN; +COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv'); +WARNING: connection error: localhost:57637 +WARNING: connection error: localhost:57637 +-- check shard states before commit +SELECT shardid, shardstate, nodename, nodeport +FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) +WHERE logicalrelid = 'numbers_hash_failure_test'::regclass +ORDER BY shardid, nodeport; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 1200016 | 3 | localhost | 57637 + 1200016 | 1 | localhost | 57638 + 1200017 | 1 | localhost | 57637 + 1200017 | 1 | localhost | 57638 + 1200018 | 1 | localhost | 57637 + 1200018 | 1 | localhost | 57638 + 1200019 | 3 | localhost | 57637 + 1200019 | 1 | localhost | 57638 +(8 rows) + +COMMIT; +-- expect some placements to be market invalid after commit +SELECT shardid, shardstate, nodename, nodeport +FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) +WHERE logicalrelid = 'numbers_hash_failure_test'::regclass +ORDER BY shardid, nodeport; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 1200016 | 3 | localhost | 57637 + 1200016 | 1 | localhost | 57638 + 1200017 | 1 | localhost | 57637 + 1200017 | 1 | localhost | 57638 + 1200018 | 1 | localhost | 57637 + 1200018 | 1 | localhost | 57638 + 1200019 | 3 | localhost | 57637 + 1200019 | 1 | localhost | 57638 +(8 rows) + +-- verify data is inserted +SELECT count(*) FROM numbers_hash_failure_test; +WARNING: connection error: localhost:57637 +WARNING: connection error: localhost:57637 + count +------- + 2 +(1 row) + -- connect back to the worker and set rename the test_user back \c - :default_user - :worker_1_port ALTER USER test_user_new RENAME TO test_user; -- connect back to the master with the proper user to continue the tests \c - :default_user - :master_port -DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second, reference_failure_test; +DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second, + reference_failure_test, numbers_hash_failure_test; SELECT * FROM run_command_on_workers('DROP USER test_user'); nodename | nodeport | success | result -----------+----------+---------+----------- diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index 08d9f46ad..037e78960 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -832,6 +832,11 @@ CREATE USER test_user; CREATE TABLE reference_failure_test (key int, value int); SELECT create_reference_table('reference_failure_test'); +-- create a hash distributed table +SET citus.shard_count TO 4; +CREATE TABLE numbers_hash_failure_test(key int, value int); +SELECT create_distributed_table('numbers_hash_failure_test', 'key'); + -- ensure that the shard is created for this user \c - test_user - :worker_1_port \dt reference_failure_test_1200015 @@ -851,9 +856,16 @@ BEGIN; INSERT INTO reference_failure_test VALUES (1, '1'); COMMIT; +BEGIN; +COPY reference_failure_test FROM STDIN WITH (FORMAT 'csv'); +2,2 +\. +COMMIT; + -- show that no data go through the table and shard states are good SELECT * FROM reference_failure_test; + -- all placements should be healthy SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) FROM pg_dist_shard_placement AS sp, @@ -863,13 +875,60 @@ AND s.logicalrelid = 'reference_failure_test'::regclass GROUP BY s.logicalrelid, sp.shardstate ORDER BY s.logicalrelid, sp.shardstate; +BEGIN; +COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv'); +1,1 +2,2 +\. + +-- some placements are invalid before abort +SELECT shardid, shardstate, nodename, nodeport +FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) +WHERE logicalrelid = 'numbers_hash_failure_test'::regclass +ORDER BY shardid, nodeport; + +ABORT; + +-- verify nothing is inserted +SELECT count(*) FROM numbers_hash_failure_test; + +-- all placements to be market valid +SELECT shardid, shardstate, nodename, nodeport +FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) +WHERE logicalrelid = 'numbers_hash_failure_test'::regclass +ORDER BY shardid, nodeport; + +BEGIN; +COPY numbers_hash_failure_test FROM STDIN WITH (FORMAT 'csv'); +1,1 +2,2 +\. + +-- check shard states before commit +SELECT shardid, shardstate, nodename, nodeport +FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) +WHERE logicalrelid = 'numbers_hash_failure_test'::regclass +ORDER BY shardid, nodeport; + +COMMIT; + +-- expect some placements to be market invalid after commit +SELECT shardid, shardstate, nodename, nodeport +FROM pg_dist_shard_placement JOIN pg_dist_shard USING (shardid) +WHERE logicalrelid = 'numbers_hash_failure_test'::regclass +ORDER BY shardid, nodeport; + +-- verify data is inserted +SELECT count(*) FROM numbers_hash_failure_test; + -- connect back to the worker and set rename the test_user back \c - :default_user - :worker_1_port ALTER USER test_user_new RENAME TO test_user; -- connect back to the master with the proper user to continue the tests \c - :default_user - :master_port -DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second, reference_failure_test; +DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second, + reference_failure_test, numbers_hash_failure_test; SELECT * FROM run_command_on_workers('DROP USER test_user'); DROP USER test_user; \ No newline at end of file