mirror of https://github.com/citusdata/citus.git
Relay error message if DML fails on worker
parent
9a04b78980
commit
4bde83e1d2
|
@ -701,6 +701,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTask
|
||||||
ListCell *taskPlacementCell = NULL;
|
ListCell *taskPlacementCell = NULL;
|
||||||
ListCell *connectionCell = NULL;
|
ListCell *connectionCell = NULL;
|
||||||
int64 affectedTupleCount = -1;
|
int64 affectedTupleCount = -1;
|
||||||
|
int failureCount = 0;
|
||||||
bool resultsOK = false;
|
bool resultsOK = false;
|
||||||
bool gotResults = false;
|
bool gotResults = false;
|
||||||
|
|
||||||
|
@ -761,12 +762,14 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTask
|
||||||
* MarkFailedShardPlacements() to ensure future statements will not use this
|
* MarkFailedShardPlacements() to ensure future statements will not use this
|
||||||
* placement.
|
* placement.
|
||||||
*/
|
*/
|
||||||
|
failureCount++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
|
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
|
||||||
if (!queryOK)
|
if (!queryOK)
|
||||||
{
|
{
|
||||||
|
failureCount++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -782,6 +785,15 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTask
|
||||||
failOnError = true;
|
failOnError = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (failureCount + 1 == list_length(taskPlacementList))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* If we already failed on all other placements (possibly 0),
|
||||||
|
* relay errors directly.
|
||||||
|
*/
|
||||||
|
failOnError = true;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If caller is interested, store query results the first time
|
* If caller is interested, store query results the first time
|
||||||
* through. The output of the query's execution on other shards is
|
* through. The output of the query's execution on other shards is
|
||||||
|
@ -818,9 +830,18 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTask
|
||||||
resultsOK = true;
|
resultsOK = true;
|
||||||
gotResults = true;
|
gotResults = true;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
failureCount++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if all placements failed, error out */
|
/*
|
||||||
|
* If a command results in an error on all workers, we relay the last error
|
||||||
|
* in the loop above by setting failOnError. However, if all connections fail
|
||||||
|
* we still complete the loop without throwing an error. In that case, throw
|
||||||
|
* an error below.
|
||||||
|
*/
|
||||||
if (!resultsOK)
|
if (!resultsOK)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("could not modify any active placements")));
|
ereport(ERROR, (errmsg("could not modify any active placements")));
|
||||||
|
|
|
@ -139,8 +139,7 @@ FROM
|
||||||
WHERE
|
WHERE
|
||||||
user_id = 0;
|
user_id = 0;
|
||||||
WARNING: function public.evaluate_on_master(integer) does not exist
|
WARNING: function public.evaluate_on_master(integer) does not exist
|
||||||
WARNING: function public.evaluate_on_master(integer) does not exist
|
ERROR: function public.evaluate_on_master(integer) does not exist
|
||||||
ERROR: could not modify any active placements
|
|
||||||
\set VERBOSITY default
|
\set VERBOSITY default
|
||||||
-- add one more row
|
-- add one more row
|
||||||
INSERT INTO raw_events_first (user_id, time) VALUES
|
INSERT INTO raw_events_first (user_id, time) VALUES
|
||||||
|
|
|
@ -188,7 +188,8 @@ DETAIL: Key (id)=(32743) already exists.
|
||||||
CONTEXT: while executing command on localhost:57638
|
CONTEXT: while executing command on localhost:57638
|
||||||
-- INSERT, with RETURNING specified, failing with a non-constraint error
|
-- INSERT, with RETURNING specified, failing with a non-constraint error
|
||||||
INSERT INTO limit_orders VALUES (34153, 'LEE', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING id / 0;
|
INSERT INTO limit_orders VALUES (34153, 'LEE', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING id / 0;
|
||||||
ERROR: could not modify any active placements
|
ERROR: division by zero
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
-- commands with non-constant partition values are supported
|
-- commands with non-constant partition values are supported
|
||||||
INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
|
INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
|
||||||
|
@ -399,9 +400,8 @@ ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
-- Fourth: Perform an INSERT on the remaining node
|
-- Fourth: Perform an INSERT on the remaining node
|
||||||
INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||||
WARNING: relation "public.limit_orders_750000" does not exist
|
ERROR: relation "public.limit_orders_750000" does not exist
|
||||||
CONTEXT: while executing command on localhost:57637
|
CONTEXT: while executing command on localhost:57637
|
||||||
ERROR: could not modify any active placements
|
|
||||||
-- Last: Verify worker is still healthy
|
-- Last: Verify worker is still healthy
|
||||||
SELECT count(*)
|
SELECT count(*)
|
||||||
FROM pg_dist_shard_placement AS sp,
|
FROM pg_dist_shard_placement AS sp,
|
||||||
|
|
|
@ -574,8 +574,7 @@ INSERT INTO objects VALUES (2, 'BAD');
|
||||||
WARNING: illegal value
|
WARNING: illegal value
|
||||||
INSERT INTO labs VALUES (8, 'Aperture Science');
|
INSERT INTO labs VALUES (8, 'Aperture Science');
|
||||||
INSERT INTO labs VALUES (9, 'BAD');
|
INSERT INTO labs VALUES (9, 'BAD');
|
||||||
WARNING: illegal value
|
ERROR: illegal value
|
||||||
ERROR: could not modify any active placements
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- data should NOT be persisted
|
-- data should NOT be persisted
|
||||||
SELECT * FROM objects WHERE id = 1;
|
SELECT * FROM objects WHERE id = 1;
|
||||||
|
@ -951,8 +950,7 @@ FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash();
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO reference_modifying_xacts VALUES (55, 10);
|
INSERT INTO reference_modifying_xacts VALUES (55, 10);
|
||||||
INSERT INTO hash_modifying_xacts VALUES (997, 1);
|
INSERT INTO hash_modifying_xacts VALUES (997, 1);
|
||||||
WARNING: illegal value
|
ERROR: illegal value
|
||||||
ERROR: could not modify any active placements
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- ensure that the value didn't go into the reference table
|
-- ensure that the value didn't go into the reference table
|
||||||
SELECT * FROM reference_modifying_xacts WHERE key = 55;
|
SELECT * FROM reference_modifying_xacts WHERE key = 55;
|
||||||
|
@ -1305,11 +1303,24 @@ WARNING: connection error: localhost:57637
|
||||||
2
|
2
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- connect back to the worker and set rename the test_user back
|
-- break the other node as well
|
||||||
\c - :default_user - :worker_1_port
|
\c - :default_user - :worker_2_port
|
||||||
ALTER USER test_user_new RENAME TO test_user;
|
ALTER USER test_user RENAME TO test_user_new;
|
||||||
|
\c - test_user - :master_port
|
||||||
|
-- fails on all shard placements
|
||||||
|
INSERT INTO numbers_hash_failure_test VALUES (2,2);
|
||||||
|
WARNING: connection error: localhost:57638
|
||||||
|
ERROR: could not modify any active placements
|
||||||
-- connect back to the master with the proper user to continue the tests
|
-- connect back to the master with the proper user to continue the tests
|
||||||
\c - :default_user - :master_port
|
\c - :default_user - :master_port
|
||||||
|
-- unbreak both nodes by renaming the user back to the original name
|
||||||
|
SELECT * FROM run_command_on_workers('ALTER USER test_user_new RENAME TO test_user');
|
||||||
|
nodename | nodeport | success | result
|
||||||
|
-----------+----------+---------+------------
|
||||||
|
localhost | 57637 | t | ALTER ROLE
|
||||||
|
localhost | 57638 | t | ALTER ROLE
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second,
|
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second,
|
||||||
reference_failure_test, numbers_hash_failure_test;
|
reference_failure_test, numbers_hash_failure_test;
|
||||||
SELECT * FROM run_command_on_workers('DROP USER test_user');
|
SELECT * FROM run_command_on_workers('DROP USER test_user');
|
||||||
|
|
|
@ -91,7 +91,8 @@ DETAIL: Key (id)=(32743) already exists.
|
||||||
CONTEXT: while executing command on localhost:57638
|
CONTEXT: while executing command on localhost:57638
|
||||||
-- INSERT, with RETURNING specified, failing with a non-constraint error
|
-- INSERT, with RETURNING specified, failing with a non-constraint error
|
||||||
INSERT INTO limit_orders_mx VALUES (34153, 'LEE', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING id / 0;
|
INSERT INTO limit_orders_mx VALUES (34153, 'LEE', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING id / 0;
|
||||||
ERROR: could not modify any active placements
|
ERROR: division by zero
|
||||||
|
CONTEXT: while executing command on localhost:57638
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
-- commands with non-constant partition values are unsupported
|
-- commands with non-constant partition values are unsupported
|
||||||
INSERT INTO limit_orders_mx VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
|
INSERT INTO limit_orders_mx VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
|
||||||
|
|
|
@ -242,8 +242,7 @@ FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx();
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO labs_mx VALUES (7, 'E Corp');
|
INSERT INTO labs_mx VALUES (7, 'E Corp');
|
||||||
INSERT INTO objects_mx VALUES (2, 'BAD');
|
INSERT INTO objects_mx VALUES (2, 'BAD');
|
||||||
WARNING: illegal value
|
ERROR: illegal value
|
||||||
ERROR: could not modify any active placements
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- data should NOT be persisted
|
-- data should NOT be persisted
|
||||||
SELECT * FROM objects_mx WHERE id = 2;
|
SELECT * FROM objects_mx WHERE id = 2;
|
||||||
|
@ -262,8 +261,7 @@ SELECT * FROM labs_mx WHERE id = 7;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO labs_mx VALUES (7, 'E Corp');
|
INSERT INTO labs_mx VALUES (7, 'E Corp');
|
||||||
INSERT INTO objects_mx VALUES (2, 'BAD');
|
INSERT INTO objects_mx VALUES (2, 'BAD');
|
||||||
WARNING: illegal value
|
ERROR: illegal value
|
||||||
ERROR: could not modify any active placements
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- data should NOT be persisted
|
-- data should NOT be persisted
|
||||||
SELECT * FROM objects_mx WHERE id = 2;
|
SELECT * FROM objects_mx WHERE id = 2;
|
||||||
|
@ -286,8 +284,7 @@ FOR EACH ROW EXECUTE PROCEDURE reject_bad_mx();
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO objects_mx VALUES (1, 'apple');
|
INSERT INTO objects_mx VALUES (1, 'apple');
|
||||||
INSERT INTO objects_mx VALUES (2, 'BAD');
|
INSERT INTO objects_mx VALUES (2, 'BAD');
|
||||||
WARNING: illegal value
|
ERROR: illegal value
|
||||||
ERROR: could not modify any active placements
|
|
||||||
INSERT INTO labs_mx VALUES (8, 'Aperture Science');
|
INSERT INTO labs_mx VALUES (8, 'Aperture Science');
|
||||||
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
INSERT INTO labs_mx VALUES (9, 'BAD');
|
INSERT INTO labs_mx VALUES (9, 'BAD');
|
||||||
|
@ -309,8 +306,7 @@ SELECT * FROM labs_mx WHERE id = 8;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO objects_mx VALUES (1, 'apple');
|
INSERT INTO objects_mx VALUES (1, 'apple');
|
||||||
INSERT INTO objects_mx VALUES (2, 'BAD');
|
INSERT INTO objects_mx VALUES (2, 'BAD');
|
||||||
WARNING: illegal value
|
ERROR: illegal value
|
||||||
ERROR: could not modify any active placements
|
|
||||||
INSERT INTO labs_mx VALUES (8, 'Aperture Science');
|
INSERT INTO labs_mx VALUES (8, 'Aperture Science');
|
||||||
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
INSERT INTO labs_mx VALUES (9, 'BAD');
|
INSERT INTO labs_mx VALUES (9, 'BAD');
|
||||||
|
|
|
@ -963,12 +963,21 @@ ORDER BY shardid, nodeport;
|
||||||
-- verify data is inserted
|
-- verify data is inserted
|
||||||
SELECT count(*) FROM numbers_hash_failure_test;
|
SELECT count(*) FROM numbers_hash_failure_test;
|
||||||
|
|
||||||
-- connect back to the worker and set rename the test_user back
|
-- break the other node as well
|
||||||
\c - :default_user - :worker_1_port
|
\c - :default_user - :worker_2_port
|
||||||
ALTER USER test_user_new RENAME TO test_user;
|
ALTER USER test_user RENAME TO test_user_new;
|
||||||
|
|
||||||
|
\c - test_user - :master_port
|
||||||
|
|
||||||
|
-- fails on all shard placements
|
||||||
|
INSERT INTO numbers_hash_failure_test VALUES (2,2);
|
||||||
|
|
||||||
-- connect back to the master with the proper user to continue the tests
|
-- connect back to the master with the proper user to continue the tests
|
||||||
\c - :default_user - :master_port
|
\c - :default_user - :master_port
|
||||||
|
|
||||||
|
-- unbreak both nodes by renaming the user back to the original name
|
||||||
|
SELECT * FROM run_command_on_workers('ALTER USER test_user_new RENAME TO test_user');
|
||||||
|
|
||||||
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second,
|
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second,
|
||||||
reference_failure_test, numbers_hash_failure_test;
|
reference_failure_test, numbers_hash_failure_test;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue