mirror of https://github.com/citusdata/citus.git
Merge pull request #552 from citusdata/fail_on_remote_error
Improve error handling during failed modification PR cr: @jasonmp85pull/539/head
commit
2f07395d65
|
@ -827,7 +827,7 @@ MasterPartitionMethod(RangeVar *relation)
|
|||
}
|
||||
else
|
||||
{
|
||||
ReportRemoteError(masterConnection, queryResult);
|
||||
WarnRemoteError(masterConnection, queryResult);
|
||||
ereport(ERROR, (errmsg("could not get the partition method of the "
|
||||
"distributed table")));
|
||||
}
|
||||
|
@ -924,7 +924,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
|
|||
result = PQexec(connection, "BEGIN");
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
WarnRemoteError(connection, result);
|
||||
failedPlacementList = lappend(failedPlacementList, placement);
|
||||
|
||||
PQclear(result);
|
||||
|
@ -937,7 +937,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
|
|||
result = PQexec(connection, copyCommand->data);
|
||||
if (PQresultStatus(result) != PGRES_COPY_IN)
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
WarnRemoteError(connection, result);
|
||||
failedPlacementList = lappend(failedPlacementList, placement);
|
||||
|
||||
PQclear(result);
|
||||
|
@ -1504,7 +1504,7 @@ RemoteCreateEmptyShard(char *relationName)
|
|||
}
|
||||
else
|
||||
{
|
||||
ReportRemoteError(masterConnection, queryResult);
|
||||
WarnRemoteError(masterConnection, queryResult);
|
||||
ereport(ERROR, (errmsg("could not create a new empty shard on the remote node")));
|
||||
}
|
||||
|
||||
|
|
|
@ -141,7 +141,7 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba
|
|||
}
|
||||
else
|
||||
{
|
||||
ReportRemoteError(connection, NULL);
|
||||
WarnRemoteError(connection, NULL);
|
||||
|
||||
PQfinish(connection);
|
||||
connectionId = INVALID_CONNECTION_ID;
|
||||
|
@ -194,7 +194,7 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD
|
|||
}
|
||||
else
|
||||
{
|
||||
ReportRemoteError(connection, NULL);
|
||||
WarnRemoteError(connection, NULL);
|
||||
|
||||
PQfinish(connection);
|
||||
connectionId = INVALID_CONNECTION_ID;
|
||||
|
@ -249,7 +249,7 @@ MultiClientConnectPoll(int32 connectionId)
|
|||
}
|
||||
else if (pollingStatus == PGRES_POLLING_FAILED)
|
||||
{
|
||||
ReportRemoteError(connection, NULL);
|
||||
WarnRemoteError(connection, NULL);
|
||||
|
||||
connectStatus = CLIENT_CONNECTION_BAD;
|
||||
}
|
||||
|
@ -433,7 +433,7 @@ MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount,
|
|||
}
|
||||
else
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
WarnRemoteError(connection, result);
|
||||
PQclear(result);
|
||||
}
|
||||
|
||||
|
@ -500,7 +500,7 @@ MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount,
|
|||
}
|
||||
else
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
WarnRemoteError(connection, result);
|
||||
PQclear(result);
|
||||
queryStatus = CLIENT_BATCH_QUERY_FAILED;
|
||||
}
|
||||
|
@ -585,7 +585,7 @@ MultiClientQueryStatus(int32 connectionId)
|
|||
copyResults = true;
|
||||
}
|
||||
|
||||
ReportRemoteError(connection, result);
|
||||
WarnRemoteError(connection, result);
|
||||
}
|
||||
|
||||
/* clear the result object */
|
||||
|
@ -675,7 +675,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
|
|||
{
|
||||
copyStatus = CLIENT_COPY_FAILED;
|
||||
|
||||
ReportRemoteError(connection, result);
|
||||
WarnRemoteError(connection, result);
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
|
@ -685,7 +685,7 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor)
|
|||
/* received an error */
|
||||
copyStatus = CLIENT_COPY_FAILED;
|
||||
|
||||
ReportRemoteError(connection, NULL);
|
||||
WarnRemoteError(connection, NULL);
|
||||
}
|
||||
|
||||
/* if copy out completed, make sure we drain all results from libpq */
|
||||
|
|
|
@ -267,7 +267,26 @@ ExecuteDistributedModify(Task *task)
|
|||
result = PQexec(connection, task->queryString);
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
|
||||
int category = 0;
|
||||
bool raiseError = false;
|
||||
|
||||
/*
|
||||
* If the error code is in constraint violation class, we want to
|
||||
* fail fast because we must get the same error from all shard
|
||||
* placements.
|
||||
*/
|
||||
category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION);
|
||||
raiseError = SqlStateMatchesCategory(sqlStateString, category);
|
||||
|
||||
if (raiseError)
|
||||
{
|
||||
ReraiseRemoteError(connection, result);
|
||||
}
|
||||
else
|
||||
{
|
||||
WarnRemoteError(connection, result);
|
||||
}
|
||||
PQclear(result);
|
||||
|
||||
failedPlacementList = lappend(failedPlacementList, taskPlacement);
|
||||
|
@ -451,14 +470,14 @@ SendQueryInSingleRowMode(PGconn *connection, char *query)
|
|||
querySent = PQsendQuery(connection, query);
|
||||
if (querySent == 0)
|
||||
{
|
||||
ReportRemoteError(connection, NULL);
|
||||
WarnRemoteError(connection, NULL);
|
||||
return false;
|
||||
}
|
||||
|
||||
singleRowMode = PQsetSingleRowMode(connection);
|
||||
if (singleRowMode == 0)
|
||||
{
|
||||
ReportRemoteError(connection, NULL);
|
||||
WarnRemoteError(connection, NULL);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -505,7 +524,7 @@ StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor,
|
|||
resultStatus = PQresultStatus(result);
|
||||
if ((resultStatus != PGRES_SINGLE_TUPLE) && (resultStatus != PGRES_TUPLES_OK))
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
WarnRemoteError(connection, result);
|
||||
PQclear(result);
|
||||
|
||||
return false;
|
||||
|
|
|
@ -371,14 +371,14 @@ SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections
|
|||
result = PQexec(connection, "BEGIN");
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
WarnRemoteError(connection, result);
|
||||
ereport(ERROR, (errmsg("could not send query to shard placement")));
|
||||
}
|
||||
|
||||
result = PQexec(connection, shardQueryString);
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
WarnRemoteError(connection, result);
|
||||
ereport(ERROR, (errmsg("could not send query to shard placement")));
|
||||
}
|
||||
|
||||
|
|
|
@ -59,7 +59,7 @@ initialize_remote_temp_table(PG_FUNCTION_ARGS)
|
|||
result = PQexec(connection, POPULATE_TEMP_TABLE);
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
WarnRemoteError(connection, result);
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
|
@ -90,7 +90,7 @@ count_remote_temp_table_rows(PG_FUNCTION_ARGS)
|
|||
result = PQexec(connection, COUNT_TEMP_TABLE);
|
||||
if (PQresultStatus(result) != PGRES_TUPLES_OK)
|
||||
{
|
||||
ReportRemoteError(connection, result);
|
||||
WarnRemoteError(connection, result);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -41,6 +41,7 @@ static HTAB *NodeConnectionHash = NULL;
|
|||
|
||||
/* local function forward declarations */
|
||||
static HTAB * CreateNodeConnectionHash(void);
|
||||
static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -191,41 +192,92 @@ PurgeConnection(PGconn *connection)
|
|||
|
||||
|
||||
/*
|
||||
* ReportRemoteError retrieves various error fields from the a remote result and
|
||||
* produces an error report at the WARNING level.
|
||||
* SqlStateMatchesCategory returns true if the given sql state is in the given
|
||||
* error category. Note that we use ERRCODE_TO_CATEGORY macro to determine error
|
||||
* category of the sql state and expect the caller to use the same macro for the
|
||||
* error category.
|
||||
*/
|
||||
bool
|
||||
SqlStateMatchesCategory(char *sqlStateString, int category)
|
||||
{
|
||||
bool sqlStateMatchesCategory = false;
|
||||
int sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2],
|
||||
sqlStateString[3], sqlStateString[4]);
|
||||
|
||||
int sqlStateCategory = ERRCODE_TO_CATEGORY(sqlState);
|
||||
if (sqlStateCategory == category)
|
||||
{
|
||||
sqlStateMatchesCategory = true;
|
||||
}
|
||||
|
||||
return sqlStateMatchesCategory;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* WarnRemoteError retrieves error fields from a remote result and produces an
|
||||
* error report at the WARNING level after amending the error with a CONTEXT
|
||||
* field containing the remote node host and port information.
|
||||
*/
|
||||
void
|
||||
ReportRemoteError(PGconn *connection, PGresult *result)
|
||||
WarnRemoteError(PGconn *connection, PGresult *result)
|
||||
{
|
||||
ReportRemoteError(connection, result, false);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReraiseRemoteError retrieves error fields from a remote result and re-raises
|
||||
* the error after amending it with a CONTEXT field containing the remote node
|
||||
* host and port information.
|
||||
*/
|
||||
void
|
||||
ReraiseRemoteError(PGconn *connection, PGresult *result)
|
||||
{
|
||||
ReportRemoteError(connection, result, true);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReportRemoteError is an internal helper function which implements logic
|
||||
* needed by both WarnRemoteError and ReraiseRemoteError. They wrap this
|
||||
* function to provide explicit names for the possible behaviors.
|
||||
*/
|
||||
static void
|
||||
ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError)
|
||||
{
|
||||
char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
|
||||
char *remoteMessage = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY);
|
||||
char *messagePrimary = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY);
|
||||
char *messageDetail = PQresultErrorField(result, PG_DIAG_MESSAGE_DETAIL);
|
||||
char *messageHint = PQresultErrorField(result, PG_DIAG_MESSAGE_HINT);
|
||||
char *messageContext = PQresultErrorField(result, PG_DIAG_CONTEXT);
|
||||
|
||||
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
||||
char *nodePort = ConnectionGetOptionValue(connection, "port");
|
||||
char *errorPrefix = "Connection failed to";
|
||||
int sqlState = ERRCODE_CONNECTION_FAILURE;
|
||||
int errorLevel = WARNING;
|
||||
|
||||
if (sqlStateString != NULL)
|
||||
{
|
||||
sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2],
|
||||
sqlStateString[3], sqlStateString[4]);
|
||||
}
|
||||
|
||||
/* use more specific error prefix for result failures */
|
||||
if (sqlState != ERRCODE_CONNECTION_FAILURE)
|
||||
{
|
||||
errorPrefix = "Bad result from";
|
||||
}
|
||||
if (raiseError)
|
||||
{
|
||||
errorLevel = ERROR;
|
||||
}
|
||||
|
||||
/*
|
||||
* If the PGresult did not contain a message, the connection may provide a
|
||||
* suitable top level one. At worst, this is an empty string.
|
||||
*/
|
||||
if (remoteMessage == NULL)
|
||||
if (messagePrimary == NULL)
|
||||
{
|
||||
char *lastNewlineIndex = NULL;
|
||||
|
||||
remoteMessage = PQerrorMessage(connection);
|
||||
lastNewlineIndex = strrchr(remoteMessage, '\n');
|
||||
messagePrimary = PQerrorMessage(connection);
|
||||
lastNewlineIndex = strrchr(messagePrimary, '\n');
|
||||
|
||||
/* trim trailing newline, if any */
|
||||
if (lastNewlineIndex != NULL)
|
||||
|
@ -234,9 +286,21 @@ ReportRemoteError(PGconn *connection, PGresult *result)
|
|||
}
|
||||
}
|
||||
|
||||
ereport(WARNING, (errcode(sqlState),
|
||||
errmsg("%s %s:%s", errorPrefix, nodeName, nodePort),
|
||||
errdetail("Remote message: %s", remoteMessage)));
|
||||
if (sqlState == ERRCODE_CONNECTION_FAILURE)
|
||||
{
|
||||
ereport(errorLevel, (errcode(sqlState),
|
||||
errmsg("connection failed to %s:%s", nodeName, nodePort),
|
||||
errdetail("%s", messagePrimary)));
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(errorLevel, (errcode(sqlState), errmsg("%s", messagePrimary),
|
||||
messageDetail ? errdetail("%s", messageDetail) : 0,
|
||||
messageHint ? errhint("%s", messageHint) : 0,
|
||||
messageContext ? errcontext("%s", messageContext) : 0,
|
||||
errcontext("while executing command on %s:%s",
|
||||
nodeName, nodePort)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -307,7 +371,7 @@ ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser)
|
|||
/* warn if still erroring on final attempt */
|
||||
if (attemptIndex == MAX_CONNECT_ATTEMPTS - 1)
|
||||
{
|
||||
ReportRemoteError(connection, NULL);
|
||||
WarnRemoteError(connection, NULL);
|
||||
}
|
||||
|
||||
PQfinish(connection);
|
||||
|
|
|
@ -75,7 +75,7 @@ PrepareRemoteTransactions(List *connectionList)
|
|||
/* a failure to prepare is an implicit rollback */
|
||||
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
|
||||
|
||||
ReportRemoteError(connection, result);
|
||||
WarnRemoteError(connection, result);
|
||||
PQclear(result);
|
||||
|
||||
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
||||
|
|
|
@ -56,7 +56,9 @@ typedef struct NodeConnectionEntry
|
|||
/* function declarations for obtaining and using a connection */
|
||||
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
|
||||
extern void PurgeConnection(PGconn *connection);
|
||||
extern void ReportRemoteError(PGconn *connection, PGresult *result);
|
||||
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
|
||||
extern void WarnRemoteError(PGconn *connection, PGresult *result);
|
||||
extern void ReraiseRemoteError(PGconn *connection, PGresult *result);
|
||||
extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser);
|
||||
extern char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword);
|
||||
#endif /* CONNECTION_CACHE_H */
|
||||
|
|
|
@ -28,7 +28,7 @@ CREATE FUNCTION set_connection_status_bad(cstring, integer)
|
|||
\set VERBOSITY terse
|
||||
-- connect to non-existent host
|
||||
SELECT initialize_remote_temp_table('dummy-host-name', 12345);
|
||||
WARNING: Connection failed to dummy-host-name:12345
|
||||
WARNING: connection failed to dummy-host-name:12345
|
||||
initialize_remote_temp_table
|
||||
------------------------------
|
||||
f
|
||||
|
|
|
@ -68,7 +68,7 @@ INSERT INTO pg_temp.:"proxy_tablename" (id) VALUES (1);
|
|||
-- test copy with bad row in middle
|
||||
\set VERBOSITY terse
|
||||
COPY pg_temp.:"proxy_tablename" FROM stdin;
|
||||
ERROR: could not modify any active placements
|
||||
ERROR: null value in column "data" violates not-null constraint
|
||||
\set VERBOSITY default
|
||||
-- verify rows were copied to distributed table
|
||||
SELECT * FROM insert_target ORDER BY id ASC;
|
||||
|
|
|
@ -139,12 +139,13 @@ ERROR: creating unique indexes on append-partitioned tables is currently unsupp
|
|||
CREATE INDEX lineitem_orderkey_index ON lineitem (l_orderkey);
|
||||
ERROR: relation "lineitem_orderkey_index" already exists
|
||||
CREATE INDEX try_index ON lineitem USING gist (l_orderkey);
|
||||
WARNING: Bad result from localhost:57638
|
||||
DETAIL: Remote message: data type bigint has no default operator class for access method "gist"
|
||||
WARNING: data type bigint has no default operator class for access method "gist"
|
||||
HINT: You must specify an operator class for the index or define a default operator class for the data type.
|
||||
CONTEXT: while executing command on localhost:57638
|
||||
ERROR: could not execute DDL command on worker node shards
|
||||
CREATE INDEX try_index ON lineitem (non_existent_column);
|
||||
WARNING: Bad result from localhost:57638
|
||||
DETAIL: Remote message: column "non_existent_column" does not exist
|
||||
WARNING: column "non_existent_column" does not exist
|
||||
CONTEXT: while executing command on localhost:57638
|
||||
ERROR: could not execute DDL command on worker node shards
|
||||
CREATE INDEX ON lineitem (l_orderkey);
|
||||
ERROR: creating index without a name on a distributed table is currently unsupported
|
||||
|
|
|
@ -149,10 +149,14 @@ ERROR: cannot plan INSERT using row with NULL value in partition column
|
|||
-- INSERT violating column constraint
|
||||
INSERT INTO limit_orders VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell',
|
||||
-5.00);
|
||||
ERROR: could not modify any active placements
|
||||
ERROR: new row for relation "limit_orders_750000" violates check constraint "limit_orders_limit_price_check"
|
||||
DETAIL: Failing row contains (18811, BUD, 14962, 2014-04-05 08:32:16, sell, -5.00).
|
||||
CONTEXT: while executing command on localhost:57637
|
||||
-- INSERT violating primary key constraint
|
||||
INSERT INTO limit_orders VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58);
|
||||
ERROR: could not modify any active placements
|
||||
ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001"
|
||||
DETAIL: Key (id)=(32743) already exists.
|
||||
CONTEXT: while executing command on localhost:57638
|
||||
SET client_min_messages TO DEFAULT;
|
||||
-- commands with non-constant partition values are unsupported
|
||||
INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
|
||||
|
@ -261,25 +265,25 @@ SELECT kind, limit_price FROM limit_orders WHERE id = 246;
|
|||
buy | 0.00
|
||||
(1 row)
|
||||
|
||||
-- Test that on unique contraint violations, we fail fast
|
||||
INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001"
|
||||
DETAIL: Key (id)=(275) already exists.
|
||||
CONTEXT: while executing command on localhost:57638
|
||||
-- Test that shards which miss a modification are marked unhealthy
|
||||
-- First: Mark all placements for a node as inactive
|
||||
UPDATE pg_dist_shard_placement
|
||||
SET shardstate = 3
|
||||
WHERE nodename = 'localhost' AND
|
||||
nodeport = :worker_1_port;
|
||||
-- Second: Perform an INSERT to the remaining node
|
||||
INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
-- Third: Mark the original placements as healthy again
|
||||
UPDATE pg_dist_shard_placement
|
||||
SET shardstate = 1
|
||||
WHERE nodename = 'localhost' AND
|
||||
nodeport = :worker_1_port;
|
||||
-- Fourth: Perform the same INSERT (primary key violation)
|
||||
INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
WARNING: Bad result from localhost:57638
|
||||
DETAIL: Remote message: duplicate key value violates unique constraint "limit_orders_pkey_750001"
|
||||
-- Last: Verify the insert worked but the placement with the PK violation is now unhealthy
|
||||
SELECT count(*) FROM limit_orders WHERE id = 275;
|
||||
-- First: Connect to the second worker node
|
||||
\c - - - :worker_2_port
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
-- Fourth: Perform an INSERT on the remaining node
|
||||
INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
WARNING: relation "limit_orders_750000" does not exist
|
||||
CONTEXT: while executing command on localhost:57638
|
||||
-- Last: Verify the insert worked but the deleted placement is now unhealthy
|
||||
SELECT count(*) FROM limit_orders WHERE id = 276;
|
||||
count
|
||||
-------
|
||||
1
|
||||
|
@ -298,6 +302,39 @@ AND s.logicalrelid = 'limit_orders'::regclass;
|
|||
1
|
||||
(1 row)
|
||||
|
||||
-- Test that if all shards miss a modification, no state change occurs
|
||||
-- First: Connect to the first worker node
|
||||
\c - - - :worker_1_port
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
-- Fourth: Perform an INSERT on the remaining node
|
||||
INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
WARNING: relation "limit_orders_750000" does not exist
|
||||
CONTEXT: while executing command on localhost:57637
|
||||
ERROR: could not modify any active placements
|
||||
-- Last: Verify worker is still healthy
|
||||
SELECT count(*)
|
||||
FROM pg_dist_shard_placement AS sp,
|
||||
pg_dist_shard AS s
|
||||
WHERE sp.shardid = s.shardid
|
||||
AND sp.nodename = 'localhost'
|
||||
AND sp.nodeport = :worker_1_port
|
||||
AND sp.shardstate = 1
|
||||
AND s.logicalrelid = 'limit_orders'::regclass;
|
||||
count
|
||||
-------
|
||||
2
|
||||
(1 row)
|
||||
|
||||
-- Undo our change...
|
||||
-- First: Connect to the first worker node
|
||||
\c - - - :worker_1_port
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
-- commands with no constraints on the partition key are not supported
|
||||
UPDATE limit_orders SET limit_price = 0.00;
|
||||
ERROR: distributed modifications must target exactly one shard
|
||||
|
|
|
@ -261,8 +261,8 @@ ALTER TABLE IF EXISTS non_existent_table ADD COLUMN new_column INTEGER;
|
|||
NOTICE: relation "non_existent_table" does not exist, skipping
|
||||
ALTER TABLE IF EXISTS lineitem_alter ALTER COLUMN int_column2 SET DATA TYPE INTEGER;
|
||||
ALTER TABLE lineitem_alter DROP COLUMN non_existent_column;
|
||||
WARNING: Bad result from localhost:57638
|
||||
DETAIL: Remote message: column "non_existent_column" of relation "lineitem_alter_220000" does not exist
|
||||
WARNING: column "non_existent_column" of relation "lineitem_alter_220000" does not exist
|
||||
CONTEXT: while executing command on localhost:57638
|
||||
ERROR: could not execute DDL command on worker node shards
|
||||
ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS non_existent_column;
|
||||
NOTICE: column "non_existent_column" of relation "lineitem_alter" does not exist, skipping
|
||||
|
@ -361,16 +361,16 @@ DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT and TYPE subc
|
|||
-- Verify that we error out in case of postgres errors on supported statement
|
||||
-- types
|
||||
ALTER TABLE lineitem_alter ADD COLUMN new_column non_existent_type;
|
||||
WARNING: Bad result from localhost:57638
|
||||
DETAIL: Remote message: type "non_existent_type" does not exist
|
||||
WARNING: type "non_existent_type" does not exist
|
||||
CONTEXT: while executing command on localhost:57638
|
||||
ERROR: could not execute DDL command on worker node shards
|
||||
ALTER TABLE lineitem_alter ALTER COLUMN null_column SET NOT NULL;
|
||||
WARNING: Bad result from localhost:57638
|
||||
DETAIL: Remote message: column "null_column" contains null values
|
||||
WARNING: column "null_column" contains null values
|
||||
CONTEXT: while executing command on localhost:57638
|
||||
ERROR: could not execute DDL command on worker node shards
|
||||
ALTER TABLE lineitem_alter ALTER COLUMN l_partkey SET DEFAULT 'a';
|
||||
WARNING: Bad result from localhost:57638
|
||||
DETAIL: Remote message: invalid input syntax for integer: "a"
|
||||
WARNING: invalid input syntax for integer: "a"
|
||||
CONTEXT: while executing command on localhost:57638
|
||||
ERROR: could not execute DDL command on worker node shards
|
||||
-- Verify that we error out on statements involving RENAME
|
||||
ALTER TABLE lineitem_alter RENAME TO lineitem_renamed;
|
||||
|
|
|
@ -455,8 +455,8 @@ COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (
|
|||
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
||||
-- Test if there is no relation to copy data with the worker copy
|
||||
COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
||||
WARNING: Bad result from localhost:57636
|
||||
DETAIL: Remote message: relation "lineitem_copy_none" does not exist
|
||||
WARNING: relation "lineitem_copy_none" does not exist
|
||||
CONTEXT: while executing command on localhost:57636
|
||||
ERROR: could not run copy from the worker node
|
||||
-- Connect back to the master node
|
||||
\c - - - 57636
|
||||
|
|
|
@ -190,28 +190,27 @@ SELECT bidder_id FROM limit_orders WHERE id = 246;
|
|||
UPDATE limit_orders SET (kind, limit_price) = ('buy', DEFAULT) WHERE id = 246;
|
||||
SELECT kind, limit_price FROM limit_orders WHERE id = 246;
|
||||
|
||||
-- Test that on unique contraint violations, we fail fast
|
||||
INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
|
||||
-- Test that shards which miss a modification are marked unhealthy
|
||||
|
||||
-- First: Mark all placements for a node as inactive
|
||||
UPDATE pg_dist_shard_placement
|
||||
SET shardstate = 3
|
||||
WHERE nodename = 'localhost' AND
|
||||
nodeport = :worker_1_port;
|
||||
-- First: Connect to the second worker node
|
||||
\c - - - :worker_2_port
|
||||
|
||||
-- Second: Perform an INSERT to the remaining node
|
||||
INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
|
||||
|
||||
-- Third: Mark the original placements as healthy again
|
||||
UPDATE pg_dist_shard_placement
|
||||
SET shardstate = 1
|
||||
WHERE nodename = 'localhost' AND
|
||||
nodeport = :worker_1_port;
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
|
||||
-- Fourth: Perform the same INSERT (primary key violation)
|
||||
INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
-- Fourth: Perform an INSERT on the remaining node
|
||||
INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
|
||||
-- Last: Verify the insert worked but the deleted placement is now unhealthy
|
||||
SELECT count(*) FROM limit_orders WHERE id = 276;
|
||||
|
||||
-- Last: Verify the insert worked but the placement with the PK violation is now unhealthy
|
||||
SELECT count(*) FROM limit_orders WHERE id = 275;
|
||||
SELECT count(*)
|
||||
FROM pg_dist_shard_placement AS sp,
|
||||
pg_dist_shard AS s
|
||||
|
@ -221,6 +220,41 @@ AND sp.nodeport = :worker_2_port
|
|||
AND sp.shardstate = 3
|
||||
AND s.logicalrelid = 'limit_orders'::regclass;
|
||||
|
||||
-- Test that if all shards miss a modification, no state change occurs
|
||||
|
||||
-- First: Connect to the first worker node
|
||||
\c - - - :worker_1_port
|
||||
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE limit_orders_750000 RENAME TO renamed_orders;
|
||||
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
|
||||
-- Fourth: Perform an INSERT on the remaining node
|
||||
INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
|
||||
|
||||
-- Last: Verify worker is still healthy
|
||||
SELECT count(*)
|
||||
FROM pg_dist_shard_placement AS sp,
|
||||
pg_dist_shard AS s
|
||||
WHERE sp.shardid = s.shardid
|
||||
AND sp.nodename = 'localhost'
|
||||
AND sp.nodeport = :worker_1_port
|
||||
AND sp.shardstate = 1
|
||||
AND s.logicalrelid = 'limit_orders'::regclass;
|
||||
|
||||
-- Undo our change...
|
||||
|
||||
-- First: Connect to the first worker node
|
||||
\c - - - :worker_1_port
|
||||
|
||||
-- Second: Move aside limit_orders shard on the second worker node
|
||||
ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
|
||||
|
||||
-- Third: Connect back to master node
|
||||
\c - - - :master_port
|
||||
|
||||
-- commands with no constraints on the partition key are not supported
|
||||
UPDATE limit_orders SET limit_price = 0.00;
|
||||
|
||||
|
|
Loading…
Reference in New Issue