SortList in FinalizedShardPlacementList, makes 3 failure tests consistent between 11/12

pull/2844/head
Philip Dubé 2019-08-22 17:30:41 +00:00
parent 693d4695d7
commit 6b0d8ed83d
10 changed files with 179 additions and 232 deletions

View File

@ -30,11 +30,13 @@
#include "commands/extension.h" #include "commands/extension.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/listutils.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
@ -408,6 +410,35 @@ ErrorIfNotSuitableToGetSize(Oid relationId)
} }
/*
* CompareShardPlacementsByWorker compares two shard placements by their
* worker node name and port.
*/
int
CompareShardPlacementsByWorker(const void *leftElement, const void *rightElement)
{
const ShardPlacement *leftPlacement = *((const ShardPlacement **) leftElement);
const ShardPlacement *rightPlacement = *((const ShardPlacement **) rightElement);
int nodeNameCmp = strncmp(leftPlacement->nodeName, rightPlacement->nodeName,
WORKER_LENGTH);
if (nodeNameCmp != 0)
{
return nodeNameCmp;
}
else if (leftPlacement->nodePort > rightPlacement->nodePort)
{
return 1;
}
else if (leftPlacement->nodePort < rightPlacement->nodePort)
{
return -1;
}
return 0;
}
/* /*
* TableShardReplicationFactor returns the current replication factor of the * TableShardReplicationFactor returns the current replication factor of the
* given relation by looking into shard placements. It errors out if there * given relation by looking into shard placements. It errors out if there
@ -693,7 +724,7 @@ FinalizedShardPlacementList(uint64 shardId)
} }
} }
return finalizedPlacementList; return SortList(finalizedPlacementList, CompareShardPlacementsByWorker);
} }

View File

@ -40,11 +40,6 @@
#include "utils/palloc.h" #include "utils/palloc.h"
/* forward declaration of local functions */
static int CompareShardPlacementsByWorker(const void *leftElement,
const void *rightElement);
/* declarations for dynamic loading */ /* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(load_shard_id_array); PG_FUNCTION_INFO_V1(load_shard_id_array);
PG_FUNCTION_INFO_V1(load_shard_interval_array); PG_FUNCTION_INFO_V1(load_shard_interval_array);
@ -171,35 +166,6 @@ load_shard_placement_array(PG_FUNCTION_ARGS)
} }
/*
* CompareShardPlacementsByWorker compares two shard placements by their
* worker node name and port.
*/
static int
CompareShardPlacementsByWorker(const void *leftElement, const void *rightElement)
{
const ShardPlacement *leftPlacement = *((const ShardPlacement **) leftElement);
const ShardPlacement *rightPlacement = *((const ShardPlacement **) rightElement);
int nodeNameCmp = strncmp(leftPlacement->nodeName, rightPlacement->nodeName,
WORKER_LENGTH);
if (nodeNameCmp != 0)
{
return nodeNameCmp;
}
else if (leftPlacement->nodePort > rightPlacement->nodePort)
{
return 1;
}
else if (leftPlacement->nodePort < rightPlacement->nodePort)
{
return -1;
}
return 0;
}
/* /*
* partition_column_id simply finds a distributed table using the provided Oid * partition_column_id simply finds a distributed table using the provided Oid
* and returns the column_id of its partition column. If the specified table is * and returns the column_id of its partition column. If the specified table is

View File

@ -147,6 +147,8 @@ extern char * ConstructQualifiedShardName(ShardInterval *shardInterval);
extern uint64 GetFirstShardId(Oid relationId); extern uint64 GetFirstShardId(Oid relationId);
extern Datum StringToDatum(char *inputString, Oid dataType); extern Datum StringToDatum(char *inputString, Oid dataType);
extern char * DatumToString(Datum datum, Oid dataType); extern char * DatumToString(Datum datum, Oid dataType);
extern int CompareShardPlacementsByWorker(const void *leftElement,
const void *rightElement);
#endif /* MASTER_METADATA_UTILITY_H */ #endif /* MASTER_METADATA_UTILITY_H */

View File

@ -52,8 +52,8 @@ SELECT citus.dump_network_traffic();
(0,worker,"[""RowDescription(fieldcount=2,fields=['F(name=min,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)', 'F(name=max,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=2,columns=[""C(length=0,value=b\\'\\')"", ""C(length=1,value=b\\'0\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']") (0,worker,"[""RowDescription(fieldcount=2,fields=['F(name=min,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)', 'F(name=max,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=2,columns=[""C(length=0,value=b\\'\\')"", ""C(length=1,value=b\\'0\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']")
(0,coordinator,"['Query(query=COMMIT)']") (0,coordinator,"['Query(query=COMMIT)']")
(0,worker,"['CommandComplete(command=COMMIT)', 'ReadyForQuery(state=idle)']") (0,worker,"['CommandComplete(command=COMMIT)', 'ReadyForQuery(state=idle)']")
(0,coordinator,"['Query(query=COPY (SELECT count(1) AS count FROM copy_test_100400 copy_test WHERE true) TO STDOUT)']") (0,coordinator,"['Query(query=SELECT count(1) AS count FROM copy_test_100400 copy_test WHERE true)']")
(0,worker,"[""CopyOutResponse(format=0,columncount=1,columns=['Anonymous(format=0)'])"", ""CopyData(data=b'4\\\\n')"", 'CopyDone()', 'CommandComplete(command=COPY 1)', 'ReadyForQuery(state=idle)']") (0,worker,"[""RowDescription(fieldcount=1,fields=['F(name=count,tableoid=0,colattrnum=0,typoid=20,typlen=8,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']")
(20 rows) (20 rows)
---- all of the following tests test behavior with 2 shard placements ---- ---- all of the following tests test behavior with 2 shard placements ----
@ -168,7 +168,10 @@ SELECT citus.mitmproxy('conn.onQuery(query="SELECT|COPY").kill()');
(1 row) (1 row)
SELECT count(1) FROM copy_test; SELECT count(1) FROM copy_test;
WARNING: could not consume data from worker node WARNING: connection error: localhost:9060
DETAIL: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
count count
------- -------
4 4
@ -224,26 +227,6 @@ SELECT count(1) FROM copy_test;
4 4
(1 row) (1 row)
-- we round-robin when picking which node to run pg_table_size on, this COPY runs it on
-- the other node, so the next copy will try to run it on our node
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY p.nodeport, p.placementid;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid
--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+-------------
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 9060 | 110
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57637 | 111
(4 rows)
SELECT count(1) FROM copy_test;
count
-------
8
(1 row)
---- kill the connection when we try to get the min, max of the table ---- ---- kill the connection when we try to get the min, max of the table ----
SELECT citus.mitmproxy('conn.onQuery(query="SELECT min\(key\), max\(key\)").kill()'); SELECT citus.mitmproxy('conn.onQuery(query="SELECT min\(key\), max\(key\)").kill()');
mitmproxy mitmproxy
@ -266,14 +249,12 @@ SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- --------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+-------------
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 9060 | 110 (2 rows)
copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57637 | 111
(4 rows)
SELECT count(1) FROM copy_test; SELECT count(1) FROM copy_test;
count count
------- -------
8 4
(1 row) (1 row)
---- kill the connection when we try to COMMIT ---- ---- kill the connection when we try to COMMIT ----
@ -296,16 +277,14 @@ SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- --------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+-------------
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 9060 | 110 copy_test | 100408 | t | 0 | 3 | 100408 | 1 | 8192 | localhost | 57637 | 112
copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57637 | 111 copy_test | 100408 | t | 0 | 3 | 100408 | 3 | 8192 | localhost | 9060 | 113
copy_test | 100409 | t | 0 | 3 | 100409 | 3 | 8192 | localhost | 9060 | 114 (4 rows)
copy_test | 100409 | t | 0 | 3 | 100409 | 1 | 8192 | localhost | 57637 | 115
(6 rows)
SELECT count(1) FROM copy_test; SELECT count(1) FROM copy_test;
count count
------- -------
12 8
(1 row) (1 row)
-- ==== Clean up, we're done here ==== -- ==== Clean up, we're done here ====

View File

@ -52,8 +52,8 @@ SELECT citus.dump_network_traffic();
(0,worker,"[""RowDescription(fieldcount=2,fields=['F(name=min,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)', 'F(name=max,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=2,columns=[""C(length=0,value=b\\'\\')"", ""C(length=1,value=b\\'0\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']") (0,worker,"[""RowDescription(fieldcount=2,fields=['F(name=min,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)', 'F(name=max,tableoid=0,colattrnum=0,typoid=23,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=2,columns=[""C(length=0,value=b\\'\\')"", ""C(length=1,value=b\\'0\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']")
(0,coordinator,"['Query(query=COMMIT)']") (0,coordinator,"['Query(query=COMMIT)']")
(0,worker,"['CommandComplete(command=COMMIT)', 'ReadyForQuery(state=idle)']") (0,worker,"['CommandComplete(command=COMMIT)', 'ReadyForQuery(state=idle)']")
(0,coordinator,"['Query(query=SELECT count(1) AS count FROM copy_test_100400 copy_test WHERE true)']") (0,coordinator,"['Query(query=COPY (SELECT count(1) AS count FROM copy_test_100400 copy_test WHERE true) TO STDOUT)']")
(0,worker,"[""RowDescription(fieldcount=1,fields=['F(name=count,tableoid=0,colattrnum=0,typoid=20,typlen=8,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']") (0,worker,"[""CopyOutResponse(format=0,columncount=1,columns=['Anonymous(format=0)'])"", ""CopyData(data=b'4\\\\n')"", 'CopyDone()', 'CommandComplete(command=COPY 1)', 'ReadyForQuery(state=idle)']")
(20 rows) (20 rows)
---- all of the following tests test behavior with 2 shard placements ---- ---- all of the following tests test behavior with 2 shard placements ----
@ -168,10 +168,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="SELECT|COPY").kill()');
(1 row) (1 row)
SELECT count(1) FROM copy_test; SELECT count(1) FROM copy_test;
WARNING: connection error: localhost:9060 WARNING: could not consume data from worker node
DETAIL: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
count count
------- -------
4 4
@ -227,26 +224,6 @@ SELECT count(1) FROM copy_test;
4 4
(1 row) (1 row)
-- we round-robin when picking which node to run pg_table_size on, this COPY runs it on
-- the other node, so the next copy will try to run it on our node
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY p.nodeport, p.placementid;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid
--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+-------------
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 9060 | 110
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57637 | 111
(4 rows)
SELECT count(1) FROM copy_test;
count
-------
8
(1 row)
---- kill the connection when we try to get the min, max of the table ---- ---- kill the connection when we try to get the min, max of the table ----
SELECT citus.mitmproxy('conn.onQuery(query="SELECT min\(key\), max\(key\)").kill()'); SELECT citus.mitmproxy('conn.onQuery(query="SELECT min\(key\), max\(key\)").kill()');
mitmproxy mitmproxy
@ -269,14 +246,12 @@ SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- --------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+-------------
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 9060 | 110 (2 rows)
copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57637 | 111
(4 rows)
SELECT count(1) FROM copy_test; SELECT count(1) FROM copy_test;
count count
------- -------
8 4
(1 row) (1 row)
---- kill the connection when we try to COMMIT ---- ---- kill the connection when we try to COMMIT ----
@ -299,16 +274,14 @@ SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- --------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+-------------
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100
copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101
copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 9060 | 110 copy_test | 100408 | t | 0 | 3 | 100408 | 1 | 8192 | localhost | 57637 | 112
copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57637 | 111 copy_test | 100408 | t | 0 | 3 | 100408 | 3 | 8192 | localhost | 9060 | 113
copy_test | 100409 | t | 0 | 3 | 100409 | 3 | 8192 | localhost | 9060 | 114 (4 rows)
copy_test | 100409 | t | 0 | 3 | 100409 | 1 | 8192 | localhost | 57637 | 115
(6 rows)
SELECT count(1) FROM copy_test; SELECT count(1) FROM copy_test;
count count
------- -------
12 8
(1 row) (1 row)
-- ==== Clean up, we're done here ==== -- ==== Clean up, we're done here ====

View File

@ -45,7 +45,11 @@ SELECT citus.dump_network_traffic();
(0,worker,"['CommandComplete(command=COPY 4)', 'ReadyForQuery(state=in_transaction_block)']") (0,worker,"['CommandComplete(command=COPY 4)', 'ReadyForQuery(state=in_transaction_block)']")
(0,coordinator,"['Query(query=COMMIT)']") (0,coordinator,"['Query(query=COMMIT)']")
(0,worker,"['CommandComplete(command=COMMIT)', 'ReadyForQuery(state=idle)']") (0,worker,"['CommandComplete(command=COMMIT)', 'ReadyForQuery(state=idle)']")
(10 rows) (1,coordinator,"[initial message]")
(1,worker,"['AuthenticationOk()', 'ParameterStatus(application_name=citus)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(server_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(TimeZone=XXX)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']")
(1,coordinator,"['Query(query=SELECT count(1) AS count FROM public.copy_test_XXXXXX copy_test)']")
(1,worker,"[""RowDescription(fieldcount=1,fields=['F(name=count,tableoid=0,colattrnum=0,typoid=20,typlen=8,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=idle)']")
(14 rows)
-- ==== kill the connection when we try to start a transaction ==== -- ==== kill the connection when we try to start a transaction ====
-- the query should abort -- the query should abort

View File

@ -20,31 +20,31 @@ SELECT create_distributed_table('select_test', 'key');
(1 row) (1 row)
-- put data in shard for which mitm node is first placement -- put data in shard for which mitm node is first placement
INSERT INTO select_test VALUES (2, 'test data'); INSERT INTO select_test VALUES (3, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").kill()'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").kill()');
mitmproxy mitmproxy
----------- -----------
(1 row) (1 row)
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
WARNING: server closed the connection unexpectedly WARNING: connection error: localhost:9060
DETAIL: server closed the connection unexpectedly
This probably means the server terminated abnormally This probably means the server terminated abnormally
before or while processing the request. before or while processing the request.
CONTEXT: while executing command on localhost:9060
key | value key | value
-----+----------- -----+-----------
2 | test data 3 | test data
(1 row) (1 row)
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
WARNING: server closed the connection unexpectedly WARNING: connection error: localhost:9060
DETAIL: server closed the connection unexpectedly
This probably means the server terminated abnormally This probably means the server terminated abnormally
before or while processing the request. before or while processing the request.
CONTEXT: while executing command on localhost:9060
key | value key | value
-----+----------- -----+-----------
2 | test data 3 | test data
(1 row) (1 row)
-- kill after first SELECT; txn should work (though placement marked bad) -- kill after first SELECT; txn should work (though placement marked bad)
@ -55,34 +55,32 @@ SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").kill()');
(1 row) (1 row)
BEGIN; BEGIN;
INSERT INTO select_test VALUES (2, 'more data'); INSERT INTO select_test VALUES (3, 'more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
WARNING: server closed the connection unexpectedly WARNING: connection error: localhost:9060
DETAIL: server closed the connection unexpectedly
This probably means the server terminated abnormally This probably means the server terminated abnormally
before or while processing the request. before or while processing the request.
CONTEXT: while executing command on localhost:9060
key | value key | value
-----+----------- -----+-----------
2 | test data 3 | test data
2 | more data 3 | more data
(2 rows) (2 rows)
INSERT INTO select_test VALUES (2, 'even more data'); INSERT INTO select_test VALUES (3, 'even more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
WARNING: connection error: localhost:9060
DETAIL: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
key | value key | value
-----+---------------- -----+----------------
2 | test data 3 | test data
2 | more data 3 | more data
2 | even more data 3 | even more data
(3 rows) (3 rows)
COMMIT; COMMIT;
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
-- some clean up -- some clean up
UPDATE pg_dist_shard_placement SET shardstate = 1 UPDATE pg_dist_shard_placement SET shardstate = 1
WHERE shardid IN ( WHERE shardid IN (
@ -91,16 +89,16 @@ WHERE shardid IN (
TRUNCATE select_test; TRUNCATE select_test;
-- now the same tests with query cancellation -- now the same tests with query cancellation
-- put data in shard for which mitm node is first placement -- put data in shard for which mitm node is first placement
INSERT INTO select_test VALUES (2, 'test data'); INSERT INTO select_test VALUES (3, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").cancel(' || pg_backend_pid() || ')'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").cancel(' || pg_backend_pid() || ')');
mitmproxy mitmproxy
----------- -----------
(1 row) (1 row)
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
ERROR: canceling statement due to user request ERROR: canceling statement due to user request
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
ERROR: canceling statement due to user request ERROR: canceling statement due to user request
-- cancel after first SELECT; txn should fail and nothing should be marked as invalid -- cancel after first SELECT; txn should fail and nothing should be marked as invalid
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").cancel(' || pg_backend_pid() || ')'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").cancel(' || pg_backend_pid() || ')');
@ -110,8 +108,8 @@ SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").cancel(' || pg_backend_pi
(1 row) (1 row)
BEGIN; BEGIN;
INSERT INTO select_test VALUES (2, 'more data'); INSERT INTO select_test VALUES (3, 'more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
ERROR: canceling statement due to user request ERROR: canceling statement due to user request
COMMIT; COMMIT;
-- show that all placements are OK -- show that all placements are OK
@ -134,15 +132,15 @@ SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).cancel(' || pg_b
(1 row) (1 row)
BEGIN; BEGIN;
INSERT INTO select_test VALUES (2, 'more data'); INSERT INTO select_test VALUES (3, 'more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
key | value key | value
-----+----------- -----+-----------
2 | more data 3 | more data
(1 row) (1 row)
INSERT INTO select_test VALUES (2, 'even more data'); INSERT INTO select_test VALUES (3, 'even more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
ERROR: canceling statement due to user request ERROR: canceling statement due to user request
COMMIT; COMMIT;
-- error after second SELECT; txn should work (though placement marked bad) -- error after second SELECT; txn should work (though placement marked bad)
@ -153,32 +151,26 @@ SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).reset()');
(1 row) (1 row)
BEGIN; BEGIN;
INSERT INTO select_test VALUES (2, 'more data'); INSERT INTO select_test VALUES (3, 'more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
key | value key | value
-----+----------- -----+-----------
2 | more data 3 | more data
(1 row) (1 row)
INSERT INTO select_test VALUES (2, 'even more data'); INSERT INTO select_test VALUES (3, 'even more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
WARNING: server closed the connection unexpectedly WARNING: connection error: localhost:9060
DETAIL: server closed the connection unexpectedly
This probably means the server terminated abnormally This probably means the server terminated abnormally
before or while processing the request. before or while processing the request.
CONTEXT: while executing command on localhost:9060
key | value key | value
-----+---------------- -----+----------------
2 | more data 3 | more data
2 | even more data 3 | even more data
(2 rows) (2 rows)
COMMIT; COMMIT;
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(2).kill()'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(2).kill()');
mitmproxy mitmproxy
----------- -----------
@ -223,11 +215,10 @@ SELECT * FROM select_test WHERE key = 1;
(1 row) (1 row)
SELECT * FROM select_test WHERE key = 1; SELECT * FROM select_test WHERE key = 1;
WARNING: server closed the connection unexpectedly ERROR: connection error: localhost:9060
DETAIL: server closed the connection unexpectedly
This probably means the server terminated abnormally This probably means the server terminated abnormally
before or while processing the request. before or while processing the request.
CONTEXT: while executing command on localhost:9060
ERROR: could not receive query results
-- now the same test with query cancellation -- now the same test with query cancellation
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).cancel(' || pg_backend_pid() || ')'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).cancel(' || pg_backend_pid() || ')');
mitmproxy mitmproxy

View File

@ -20,31 +20,31 @@ SELECT create_distributed_table('select_test', 'key');
(1 row) (1 row)
-- put data in shard for which mitm node is first placement -- put data in shard for which mitm node is first placement
INSERT INTO select_test VALUES (2, 'test data'); INSERT INTO select_test VALUES (3, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").kill()'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").kill()');
mitmproxy mitmproxy
----------- -----------
(1 row) (1 row)
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
WARNING: connection error: localhost:9060 WARNING: server closed the connection unexpectedly
DETAIL: server closed the connection unexpectedly
This probably means the server terminated abnormally This probably means the server terminated abnormally
before or while processing the request. before or while processing the request.
CONTEXT: while executing command on localhost:9060
key | value key | value
-----+----------- -----+-----------
2 | test data 3 | test data
(1 row) (1 row)
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
WARNING: connection error: localhost:9060 WARNING: server closed the connection unexpectedly
DETAIL: server closed the connection unexpectedly
This probably means the server terminated abnormally This probably means the server terminated abnormally
before or while processing the request. before or while processing the request.
CONTEXT: while executing command on localhost:9060
key | value key | value
-----+----------- -----+-----------
2 | test data 3 | test data
(1 row) (1 row)
-- kill after first SELECT; txn should work (though placement marked bad) -- kill after first SELECT; txn should work (though placement marked bad)
@ -55,32 +55,34 @@ SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").kill()');
(1 row) (1 row)
BEGIN; BEGIN;
INSERT INTO select_test VALUES (2, 'more data'); INSERT INTO select_test VALUES (3, 'more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
WARNING: connection error: localhost:9060 WARNING: server closed the connection unexpectedly
DETAIL: server closed the connection unexpectedly
This probably means the server terminated abnormally This probably means the server terminated abnormally
before or while processing the request. before or while processing the request.
CONTEXT: while executing command on localhost:9060
key | value key | value
-----+----------- -----+-----------
2 | test data 3 | test data
2 | more data 3 | more data
(2 rows) (2 rows)
INSERT INTO select_test VALUES (2, 'even more data'); INSERT INTO select_test VALUES (3, 'even more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
WARNING: connection error: localhost:9060
DETAIL: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
key | value key | value
-----+---------------- -----+----------------
2 | test data 3 | test data
2 | more data 3 | more data
2 | even more data 3 | even more data
(3 rows) (3 rows)
COMMIT; COMMIT;
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
-- some clean up -- some clean up
UPDATE pg_dist_shard_placement SET shardstate = 1 UPDATE pg_dist_shard_placement SET shardstate = 1
WHERE shardid IN ( WHERE shardid IN (
@ -89,16 +91,16 @@ WHERE shardid IN (
TRUNCATE select_test; TRUNCATE select_test;
-- now the same tests with query cancellation -- now the same tests with query cancellation
-- put data in shard for which mitm node is first placement -- put data in shard for which mitm node is first placement
INSERT INTO select_test VALUES (2, 'test data'); INSERT INTO select_test VALUES (3, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").cancel(' || pg_backend_pid() || ')'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").cancel(' || pg_backend_pid() || ')');
mitmproxy mitmproxy
----------- -----------
(1 row) (1 row)
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
ERROR: canceling statement due to user request ERROR: canceling statement due to user request
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
ERROR: canceling statement due to user request ERROR: canceling statement due to user request
-- cancel after first SELECT; txn should fail and nothing should be marked as invalid -- cancel after first SELECT; txn should fail and nothing should be marked as invalid
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").cancel(' || pg_backend_pid() || ')'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").cancel(' || pg_backend_pid() || ')');
@ -108,8 +110,8 @@ SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").cancel(' || pg_backend_pi
(1 row) (1 row)
BEGIN; BEGIN;
INSERT INTO select_test VALUES (2, 'more data'); INSERT INTO select_test VALUES (3, 'more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
ERROR: canceling statement due to user request ERROR: canceling statement due to user request
COMMIT; COMMIT;
-- show that all placements are OK -- show that all placements are OK
@ -132,15 +134,15 @@ SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).cancel(' || pg_b
(1 row) (1 row)
BEGIN; BEGIN;
INSERT INTO select_test VALUES (2, 'more data'); INSERT INTO select_test VALUES (3, 'more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
key | value key | value
-----+----------- -----+-----------
2 | more data 3 | more data
(1 row) (1 row)
INSERT INTO select_test VALUES (2, 'even more data'); INSERT INTO select_test VALUES (3, 'even more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
ERROR: canceling statement due to user request ERROR: canceling statement due to user request
COMMIT; COMMIT;
-- error after second SELECT; txn should work (though placement marked bad) -- error after second SELECT; txn should work (though placement marked bad)
@ -151,26 +153,32 @@ SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).reset()');
(1 row) (1 row)
BEGIN; BEGIN;
INSERT INTO select_test VALUES (2, 'more data'); INSERT INTO select_test VALUES (3, 'more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
key | value key | value
-----+----------- -----+-----------
2 | more data 3 | more data
(1 row) (1 row)
INSERT INTO select_test VALUES (2, 'even more data'); INSERT INTO select_test VALUES (3, 'even more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
WARNING: connection error: localhost:9060 WARNING: server closed the connection unexpectedly
DETAIL: server closed the connection unexpectedly
This probably means the server terminated abnormally This probably means the server terminated abnormally
before or while processing the request. before or while processing the request.
CONTEXT: while executing command on localhost:9060
key | value key | value
-----+---------------- -----+----------------
2 | more data 3 | more data
2 | even more data 3 | even more data
(2 rows) (2 rows)
COMMIT; COMMIT;
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(2).kill()'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(2).kill()');
mitmproxy mitmproxy
----------- -----------
@ -215,10 +223,11 @@ SELECT * FROM select_test WHERE key = 1;
(1 row) (1 row)
SELECT * FROM select_test WHERE key = 1; SELECT * FROM select_test WHERE key = 1;
ERROR: connection error: localhost:9060 WARNING: server closed the connection unexpectedly
DETAIL: server closed the connection unexpectedly
This probably means the server terminated abnormally This probably means the server terminated abnormally
before or while processing the request. before or while processing the request.
CONTEXT: while executing command on localhost:9060
ERROR: could not receive query results
-- now the same test with query cancellation -- now the same test with query cancellation
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).cancel(' || pg_backend_pid() || ')'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).cancel(' || pg_backend_pid() || ')');
mitmproxy mitmproxy

View File

@ -72,14 +72,6 @@ SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
ORDER BY placementid; ORDER BY placementid;
SELECT count(1) FROM copy_test; SELECT count(1) FROM copy_test;
-- we round-robin when picking which node to run pg_table_size on, this COPY runs it on
-- the other node, so the next copy will try to run it on our node
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;
SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p
WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass
ORDER BY p.nodeport, p.placementid;
SELECT count(1) FROM copy_test;
---- kill the connection when we try to get the min, max of the table ---- ---- kill the connection when we try to get the min, max of the table ----
SELECT citus.mitmproxy('conn.onQuery(query="SELECT min\(key\), max\(key\)").kill()'); SELECT citus.mitmproxy('conn.onQuery(query="SELECT min\(key\), max\(key\)").kill()');
COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV;

View File

@ -8,20 +8,20 @@ CREATE TABLE select_test (key int, value text);
SELECT create_distributed_table('select_test', 'key'); SELECT create_distributed_table('select_test', 'key');
-- put data in shard for which mitm node is first placement -- put data in shard for which mitm node is first placement
INSERT INTO select_test VALUES (2, 'test data'); INSERT INTO select_test VALUES (3, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").kill()'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").kill()');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
-- kill after first SELECT; txn should work (though placement marked bad) -- kill after first SELECT; txn should work (though placement marked bad)
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").kill()'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").kill()');
BEGIN; BEGIN;
INSERT INTO select_test VALUES (2, 'more data'); INSERT INTO select_test VALUES (3, 'more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
INSERT INTO select_test VALUES (2, 'even more data'); INSERT INTO select_test VALUES (3, 'even more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
COMMIT; COMMIT;
-- some clean up -- some clean up
@ -34,18 +34,18 @@ TRUNCATE select_test;
-- now the same tests with query cancellation -- now the same tests with query cancellation
-- put data in shard for which mitm node is first placement -- put data in shard for which mitm node is first placement
INSERT INTO select_test VALUES (2, 'test data'); INSERT INTO select_test VALUES (3, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").cancel(' || pg_backend_pid() || ')'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").cancel(' || pg_backend_pid() || ')');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
-- cancel after first SELECT; txn should fail and nothing should be marked as invalid -- cancel after first SELECT; txn should fail and nothing should be marked as invalid
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").cancel(' || pg_backend_pid() || ')'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").cancel(' || pg_backend_pid() || ')');
BEGIN; BEGIN;
INSERT INTO select_test VALUES (2, 'more data'); INSERT INTO select_test VALUES (3, 'more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
COMMIT; COMMIT;
-- show that all placements are OK -- show that all placements are OK
@ -60,20 +60,20 @@ TRUNCATE select_test;
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).cancel(' || pg_backend_pid() || ')'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).cancel(' || pg_backend_pid() || ')');
BEGIN; BEGIN;
INSERT INTO select_test VALUES (2, 'more data'); INSERT INTO select_test VALUES (3, 'more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
INSERT INTO select_test VALUES (2, 'even more data'); INSERT INTO select_test VALUES (3, 'even more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
COMMIT; COMMIT;
-- error after second SELECT; txn should work (though placement marked bad) -- error after second SELECT; txn should work (though placement marked bad)
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).reset()'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(1).reset()');
BEGIN; BEGIN;
INSERT INTO select_test VALUES (2, 'more data'); INSERT INTO select_test VALUES (3, 'more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
INSERT INTO select_test VALUES (2, 'even more data'); INSERT INTO select_test VALUES (3, 'even more data');
SELECT * FROM select_test WHERE key = 2; SELECT * FROM select_test WHERE key = 3;
COMMIT; COMMIT;
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(2).kill()'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT").after(2).kill()');