mirror of https://github.com/citusdata/citus.git
Isolation test and bug fix
parent
55770d2816
commit
fb42c17ac3
|
@ -253,10 +253,13 @@ ErrorIfCannotSplitShardExtended(SplitOperation splitOperation,
|
|||
{
|
||||
int32 shardSplitPointValue = DatumGetInt32(shardSplitPoint);
|
||||
|
||||
/* All Split points should lie within the shard interval range. */
|
||||
int splitPointShardIndex = FindShardIntervalIndex(shardSplitPoint,
|
||||
cachedTableEntry);
|
||||
if (shardIntervalToSplit->shardIndex != splitPointShardIndex)
|
||||
/*
|
||||
* 1) All Split points should lie within the shard interval range.
|
||||
* 2) Given our split points inclusive, you cannot specify the max value in a range as a split point.
|
||||
* Example: Shard 81060002 range is from (0,1073741823). '1073741823' as split point is invalid.
|
||||
* '1073741822' is correct and will split shard to: (0, 1073741822) and (1073741823, 1073741823).
|
||||
*/
|
||||
if (shardSplitPointValue < minValue || shardSplitPointValue > maxValue)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
|
@ -267,6 +270,16 @@ ErrorIfCannotSplitShardExtended(SplitOperation splitOperation,
|
|||
DatumGetInt32(shardIntervalToSplit->maxValue),
|
||||
shardIntervalToSplit->shardId)));
|
||||
}
|
||||
else if (maxValue == shardSplitPointValue)
|
||||
{
|
||||
int32 validSplitPoint = shardIntervalToSplit->maxValue - 1;
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg(
|
||||
"Invalid split point %d, as split points should be inclusive. Please use %d instead.",
|
||||
maxValue,
|
||||
validSplitPoint)));
|
||||
}
|
||||
|
||||
/* Split points should be in strictly increasing order */
|
||||
int32 lastShardSplitPointValue = DatumGetInt32(lastShardSplitPoint.value);
|
||||
|
@ -282,22 +295,6 @@ ErrorIfCannotSplitShardExtended(SplitOperation splitOperation,
|
|||
shardSplitPointValue)));
|
||||
}
|
||||
|
||||
/*
|
||||
* Given our split points inclusive, you cannot specify the max value in a range as a split point.
|
||||
* Example: Shard 81060002 range is from (0,1073741823). '1073741823' as split point is invalid.
|
||||
* '1073741822' is correct and will split shard to: (0, 1073741822) and (1073741823, 1073741823).
|
||||
*/
|
||||
if (maxValue == shardSplitPointValue)
|
||||
{
|
||||
int32 validSplitPoint = shardIntervalToSplit->maxValue - 1;
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg(
|
||||
"Invalid split point %d, as split points should be inclusive. Please use %d instead.",
|
||||
maxValue,
|
||||
validSplitPoint)));
|
||||
}
|
||||
|
||||
lastShardSplitPoint = (NullableDatum) {
|
||||
shardSplitPoint, false
|
||||
};
|
||||
|
|
|
@ -11,3 +11,4 @@ test: isolation_ref2ref_foreign_keys_enterprise
|
|||
test: isolation_pg_send_cancellation
|
||||
test: isolation_shard_move_vs_start_metadata_sync
|
||||
test: isolation_tenant_isolation
|
||||
test: isolation_blocking_shard_split
|
||||
|
|
|
@ -0,0 +1,951 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-begin s2-blocking-shard-split s1-update s2-commit s1-commit s2-print-cluster
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-load-cache:
|
||||
-- Indirect way to load cache.
|
||||
TRUNCATE to_split_table;
|
||||
|
||||
step s1-insert:
|
||||
-- Id '123456789' maps to shard xxxxx.
|
||||
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
|
||||
INSERT INTO to_split_table VALUES (123456789, 1);
|
||||
|
||||
get_shard_id_for_distribution_column
|
||||
---------------------------------------------------------------------
|
||||
1500002
|
||||
(1 row)
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
-- the tests are written with the logic where single shard SELECTs
|
||||
-- do not to open transaction blocks
|
||||
SET citus.select_opens_transaction_block TO false;
|
||||
|
||||
step s1-select:
|
||||
SELECT count(*) FROM to_split_table WHERE id = 123456789;
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-blocking-shard-split:
|
||||
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||
1500002,
|
||||
ARRAY['1073741824'],
|
||||
ARRAY[1, 2],
|
||||
'blocking');
|
||||
|
||||
citus_split_shard_by_split_points
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-update:
|
||||
UPDATE to_split_table SET value = 111 WHERE id = 123456789;
|
||||
<waiting ...>
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-update: <... completed>
|
||||
ERROR: could not find valid entry for shard xxxxx
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-print-cluster:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('to_split_table', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
-- rows
|
||||
SELECT id, value FROM to_split_table ORDER BY id, value;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57637|1500001|t | 0
|
||||
57637|1500003|t | 1
|
||||
57638|1500004|t | 0
|
||||
(3 rows)
|
||||
|
||||
id|value
|
||||
---------------------------------------------------------------------
|
||||
123456789| 1
|
||||
(1 row)
|
||||
|
||||
|
||||
starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-begin s2-blocking-shard-split s1-delete s2-commit s1-commit s2-print-cluster
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-load-cache:
|
||||
-- Indirect way to load cache.
|
||||
TRUNCATE to_split_table;
|
||||
|
||||
step s1-insert:
|
||||
-- Id '123456789' maps to shard xxxxx.
|
||||
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
|
||||
INSERT INTO to_split_table VALUES (123456789, 1);
|
||||
|
||||
get_shard_id_for_distribution_column
|
||||
---------------------------------------------------------------------
|
||||
1500002
|
||||
(1 row)
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
-- the tests are written with the logic where single shard SELECTs
|
||||
-- do not to open transaction blocks
|
||||
SET citus.select_opens_transaction_block TO false;
|
||||
|
||||
step s1-select:
|
||||
SELECT count(*) FROM to_split_table WHERE id = 123456789;
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-blocking-shard-split:
|
||||
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||
1500002,
|
||||
ARRAY['1073741824'],
|
||||
ARRAY[1, 2],
|
||||
'blocking');
|
||||
|
||||
citus_split_shard_by_split_points
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-delete:
|
||||
DELETE FROM to_split_table WHERE id = 123456789;
|
||||
<waiting ...>
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-delete: <... completed>
|
||||
ERROR: could not find valid entry for shard xxxxx
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-print-cluster:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('to_split_table', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
-- rows
|
||||
SELECT id, value FROM to_split_table ORDER BY id, value;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57637|1500001|t | 0
|
||||
57637|1500003|t | 1
|
||||
57638|1500004|t | 0
|
||||
(3 rows)
|
||||
|
||||
id|value
|
||||
---------------------------------------------------------------------
|
||||
123456789| 1
|
||||
(1 row)
|
||||
|
||||
|
||||
starting permutation: s1-load-cache s1-begin s1-select s2-begin s2-blocking-shard-split s1-insert s2-commit s1-commit s2-print-cluster
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-load-cache:
|
||||
-- Indirect way to load cache.
|
||||
TRUNCATE to_split_table;
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
-- the tests are written with the logic where single shard SELECTs
|
||||
-- do not to open transaction blocks
|
||||
SET citus.select_opens_transaction_block TO false;
|
||||
|
||||
step s1-select:
|
||||
SELECT count(*) FROM to_split_table WHERE id = 123456789;
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-blocking-shard-split:
|
||||
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||
1500002,
|
||||
ARRAY['1073741824'],
|
||||
ARRAY[1, 2],
|
||||
'blocking');
|
||||
|
||||
citus_split_shard_by_split_points
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-insert:
|
||||
-- Id '123456789' maps to shard xxxxx.
|
||||
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
|
||||
INSERT INTO to_split_table VALUES (123456789, 1);
|
||||
<waiting ...>
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-insert: <... completed>
|
||||
get_shard_id_for_distribution_column
|
||||
---------------------------------------------------------------------
|
||||
1500002
|
||||
(1 row)
|
||||
|
||||
ERROR: could not find valid entry for shard xxxxx
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-print-cluster:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('to_split_table', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
-- rows
|
||||
SELECT id, value FROM to_split_table ORDER BY id, value;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57637|1500001|t | 0
|
||||
57637|1500003|t | 0
|
||||
57638|1500004|t | 0
|
||||
(3 rows)
|
||||
|
||||
id|value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
|
||||
starting permutation: s1-load-cache s1-begin s1-select s2-begin s2-blocking-shard-split s1-copy s2-commit s1-commit s2-print-cluster
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-load-cache:
|
||||
-- Indirect way to load cache.
|
||||
TRUNCATE to_split_table;
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
-- the tests are written with the logic where single shard SELECTs
|
||||
-- do not to open transaction blocks
|
||||
SET citus.select_opens_transaction_block TO false;
|
||||
|
||||
step s1-select:
|
||||
SELECT count(*) FROM to_split_table WHERE id = 123456789;
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-blocking-shard-split:
|
||||
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||
1500002,
|
||||
ARRAY['1073741824'],
|
||||
ARRAY[1, 2],
|
||||
'blocking');
|
||||
|
||||
citus_split_shard_by_split_points
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-copy:
|
||||
COPY to_split_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
|
||||
<waiting ...>
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-copy: <... completed>
|
||||
ERROR: could not find valid entry for shard xxxxx
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-print-cluster:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('to_split_table', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
-- rows
|
||||
SELECT id, value FROM to_split_table ORDER BY id, value;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57637|1500001|t | 0
|
||||
57637|1500003|t | 0
|
||||
57638|1500004|t | 0
|
||||
(3 rows)
|
||||
|
||||
id|value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
|
||||
starting permutation: s1-insert s1-begin s1-select s2-begin s2-blocking-shard-split s1-update s2-commit s1-commit s2-print-cluster
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-insert:
|
||||
-- Id '123456789' maps to shard xxxxx.
|
||||
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
|
||||
INSERT INTO to_split_table VALUES (123456789, 1);
|
||||
|
||||
get_shard_id_for_distribution_column
|
||||
---------------------------------------------------------------------
|
||||
1500002
|
||||
(1 row)
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
-- the tests are written with the logic where single shard SELECTs
|
||||
-- do not to open transaction blocks
|
||||
SET citus.select_opens_transaction_block TO false;
|
||||
|
||||
step s1-select:
|
||||
SELECT count(*) FROM to_split_table WHERE id = 123456789;
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-blocking-shard-split:
|
||||
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||
1500002,
|
||||
ARRAY['1073741824'],
|
||||
ARRAY[1, 2],
|
||||
'blocking');
|
||||
|
||||
citus_split_shard_by_split_points
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-update:
|
||||
UPDATE to_split_table SET value = 111 WHERE id = 123456789;
|
||||
<waiting ...>
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-update: <... completed>
|
||||
ERROR: could not find valid entry for shard xxxxx
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-print-cluster:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('to_split_table', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
-- rows
|
||||
SELECT id, value FROM to_split_table ORDER BY id, value;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57637|1500001|t | 0
|
||||
57637|1500003|t | 1
|
||||
57638|1500004|t | 0
|
||||
(3 rows)
|
||||
|
||||
id|value
|
||||
---------------------------------------------------------------------
|
||||
123456789| 1
|
||||
(1 row)
|
||||
|
||||
|
||||
starting permutation: s1-insert s1-begin s1-select s2-begin s2-blocking-shard-split s1-delete s2-commit s1-commit s2-print-cluster
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-insert:
|
||||
-- Id '123456789' maps to shard xxxxx.
|
||||
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
|
||||
INSERT INTO to_split_table VALUES (123456789, 1);
|
||||
|
||||
get_shard_id_for_distribution_column
|
||||
---------------------------------------------------------------------
|
||||
1500002
|
||||
(1 row)
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
-- the tests are written with the logic where single shard SELECTs
|
||||
-- do not to open transaction blocks
|
||||
SET citus.select_opens_transaction_block TO false;
|
||||
|
||||
step s1-select:
|
||||
SELECT count(*) FROM to_split_table WHERE id = 123456789;
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-blocking-shard-split:
|
||||
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||
1500002,
|
||||
ARRAY['1073741824'],
|
||||
ARRAY[1, 2],
|
||||
'blocking');
|
||||
|
||||
citus_split_shard_by_split_points
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-delete:
|
||||
DELETE FROM to_split_table WHERE id = 123456789;
|
||||
<waiting ...>
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-delete: <... completed>
|
||||
ERROR: could not find valid entry for shard xxxxx
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-print-cluster:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('to_split_table', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
-- rows
|
||||
SELECT id, value FROM to_split_table ORDER BY id, value;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57637|1500001|t | 0
|
||||
57637|1500003|t | 1
|
||||
57638|1500004|t | 0
|
||||
(3 rows)
|
||||
|
||||
id|value
|
||||
---------------------------------------------------------------------
|
||||
123456789| 1
|
||||
(1 row)
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-select s2-begin s2-blocking-shard-split s1-insert s2-commit s1-commit s2-print-cluster
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
-- the tests are written with the logic where single shard SELECTs
|
||||
-- do not to open transaction blocks
|
||||
SET citus.select_opens_transaction_block TO false;
|
||||
|
||||
step s1-select:
|
||||
SELECT count(*) FROM to_split_table WHERE id = 123456789;
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-blocking-shard-split:
|
||||
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||
1500002,
|
||||
ARRAY['1073741824'],
|
||||
ARRAY[1, 2],
|
||||
'blocking');
|
||||
|
||||
citus_split_shard_by_split_points
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-insert:
|
||||
-- Id '123456789' maps to shard xxxxx.
|
||||
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
|
||||
INSERT INTO to_split_table VALUES (123456789, 1);
|
||||
<waiting ...>
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-insert: <... completed>
|
||||
get_shard_id_for_distribution_column
|
||||
---------------------------------------------------------------------
|
||||
1500002
|
||||
(1 row)
|
||||
|
||||
ERROR: could not find valid entry for shard xxxxx
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-print-cluster:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('to_split_table', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
-- rows
|
||||
SELECT id, value FROM to_split_table ORDER BY id, value;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57637|1500001|t | 0
|
||||
57637|1500003|t | 0
|
||||
57638|1500004|t | 0
|
||||
(3 rows)
|
||||
|
||||
id|value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-select s2-begin s2-blocking-shard-split s1-copy s2-commit s1-commit s2-print-cluster
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
-- the tests are written with the logic where single shard SELECTs
|
||||
-- do not to open transaction blocks
|
||||
SET citus.select_opens_transaction_block TO false;
|
||||
|
||||
step s1-select:
|
||||
SELECT count(*) FROM to_split_table WHERE id = 123456789;
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-blocking-shard-split:
|
||||
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||
1500002,
|
||||
ARRAY['1073741824'],
|
||||
ARRAY[1, 2],
|
||||
'blocking');
|
||||
|
||||
citus_split_shard_by_split_points
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-copy:
|
||||
COPY to_split_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
|
||||
<waiting ...>
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-copy: <... completed>
|
||||
ERROR: could not find valid entry for shard xxxxx
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-print-cluster:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('to_split_table', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
-- rows
|
||||
SELECT id, value FROM to_split_table ORDER BY id, value;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57637|1500001|t | 0
|
||||
57637|1500003|t | 0
|
||||
57638|1500004|t | 0
|
||||
(3 rows)
|
||||
|
||||
id|value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
|
||||
starting permutation: s1-load-cache s1-insert s1-begin s1-blocking-shard-split s2-blocking-shard-split s1-commit s2-print-cluster
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-load-cache:
|
||||
-- Indirect way to load cache.
|
||||
TRUNCATE to_split_table;
|
||||
|
||||
step s1-insert:
|
||||
-- Id '123456789' maps to shard xxxxx.
|
||||
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
|
||||
INSERT INTO to_split_table VALUES (123456789, 1);
|
||||
|
||||
get_shard_id_for_distribution_column
|
||||
---------------------------------------------------------------------
|
||||
1500002
|
||||
(1 row)
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
-- the tests are written with the logic where single shard SELECTs
|
||||
-- do not to open transaction blocks
|
||||
SET citus.select_opens_transaction_block TO false;
|
||||
|
||||
step s1-blocking-shard-split:
|
||||
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||
1500001,
|
||||
ARRAY['-1073741824'],
|
||||
ARRAY[1, 2],
|
||||
'blocking');
|
||||
|
||||
citus_split_shard_by_split_points
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-blocking-shard-split:
|
||||
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||
1500002,
|
||||
ARRAY['1073741824'],
|
||||
ARRAY[1, 2],
|
||||
'blocking');
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-blocking-shard-split: <... completed>
|
||||
citus_split_shard_by_split_points
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-print-cluster:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('to_split_table', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
-- rows
|
||||
SELECT id, value FROM to_split_table ORDER BY id, value;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57637|1500003|t | 0
|
||||
57637|1500005|t | 1
|
||||
57638|1500004|t | 0
|
||||
57638|1500006|t | 0
|
||||
(4 rows)
|
||||
|
||||
id|value
|
||||
---------------------------------------------------------------------
|
||||
123456789| 1
|
||||
(1 row)
|
||||
|
||||
|
||||
starting permutation: s1-insert s1-begin s1-blocking-shard-split s2-blocking-shard-split s1-commit s2-print-cluster
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-insert:
|
||||
-- Id '123456789' maps to shard xxxxx.
|
||||
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
|
||||
INSERT INTO to_split_table VALUES (123456789, 1);
|
||||
|
||||
get_shard_id_for_distribution_column
|
||||
---------------------------------------------------------------------
|
||||
1500002
|
||||
(1 row)
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
-- the tests are written with the logic where single shard SELECTs
|
||||
-- do not to open transaction blocks
|
||||
SET citus.select_opens_transaction_block TO false;
|
||||
|
||||
step s1-blocking-shard-split:
|
||||
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||
1500001,
|
||||
ARRAY['-1073741824'],
|
||||
ARRAY[1, 2],
|
||||
'blocking');
|
||||
|
||||
citus_split_shard_by_split_points
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-blocking-shard-split:
|
||||
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||
1500002,
|
||||
ARRAY['1073741824'],
|
||||
ARRAY[1, 2],
|
||||
'blocking');
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-blocking-shard-split: <... completed>
|
||||
citus_split_shard_by_split_points
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-print-cluster:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('to_split_table', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
-- rows
|
||||
SELECT id, value FROM to_split_table ORDER BY id, value;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57637|1500003|t | 0
|
||||
57637|1500005|t | 1
|
||||
57638|1500004|t | 0
|
||||
57638|1500006|t | 0
|
||||
(4 rows)
|
||||
|
||||
id|value
|
||||
---------------------------------------------------------------------
|
||||
123456789| 1
|
||||
(1 row)
|
||||
|
||||
|
||||
starting permutation: s1-load-cache s1-begin s1-select s2-begin s2-blocking-shard-split s1-ddl s2-commit s1-commit s2-print-cluster s2-print-index-count
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-load-cache:
|
||||
-- Indirect way to load cache.
|
||||
TRUNCATE to_split_table;
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
-- the tests are written with the logic where single shard SELECTs
|
||||
-- do not to open transaction blocks
|
||||
SET citus.select_opens_transaction_block TO false;
|
||||
|
||||
step s1-select:
|
||||
SELECT count(*) FROM to_split_table WHERE id = 123456789;
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-blocking-shard-split:
|
||||
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||
1500002,
|
||||
ARRAY['1073741824'],
|
||||
ARRAY[1, 2],
|
||||
'blocking');
|
||||
|
||||
citus_split_shard_by_split_points
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-ddl:
|
||||
CREATE INDEX test_table_index ON to_split_table(id);
|
||||
<waiting ...>
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-ddl: <... completed>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-print-cluster:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('to_split_table', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
-- rows
|
||||
SELECT id, value FROM to_split_table ORDER BY id, value;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57637|1500001|t | 0
|
||||
57637|1500003|t | 0
|
||||
57638|1500004|t | 0
|
||||
(3 rows)
|
||||
|
||||
id|value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
step s2-print-index-count:
|
||||
SELECT
|
||||
nodeport, success, result
|
||||
FROM
|
||||
run_command_on_placements('to_split_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
|
||||
ORDER BY
|
||||
nodeport;
|
||||
|
||||
nodeport|success|result
|
||||
---------------------------------------------------------------------
|
||||
57637|t | 1
|
||||
57637|t | 1
|
||||
57638|t | 1
|
||||
(3 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-select s2-begin s2-blocking-shard-split s1-ddl s2-commit s1-commit s2-print-cluster s2-print-index-count
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
-- the tests are written with the logic where single shard SELECTs
|
||||
-- do not to open transaction blocks
|
||||
SET citus.select_opens_transaction_block TO false;
|
||||
|
||||
step s1-select:
|
||||
SELECT count(*) FROM to_split_table WHERE id = 123456789;
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-blocking-shard-split:
|
||||
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||
1500002,
|
||||
ARRAY['1073741824'],
|
||||
ARRAY[1, 2],
|
||||
'blocking');
|
||||
|
||||
citus_split_shard_by_split_points
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-ddl:
|
||||
CREATE INDEX test_table_index ON to_split_table(id);
|
||||
<waiting ...>
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-ddl: <... completed>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-print-cluster:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('to_split_table', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
-- rows
|
||||
SELECT id, value FROM to_split_table ORDER BY id, value;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57637|1500001|t | 0
|
||||
57637|1500003|t | 0
|
||||
57638|1500004|t | 0
|
||||
(3 rows)
|
||||
|
||||
id|value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
step s2-print-index-count:
|
||||
SELECT
|
||||
nodeport, success, result
|
||||
FROM
|
||||
run_command_on_placements('to_split_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
|
||||
ORDER BY
|
||||
nodeport;
|
||||
|
||||
nodeport|success|result
|
||||
---------------------------------------------------------------------
|
||||
57637|t | 1
|
||||
57637|t | 1
|
||||
57638|t | 1
|
||||
(3 rows)
|
||||
|
|
@ -0,0 +1,146 @@
|
|||
setup
|
||||
{
|
||||
SET citus.shard_count to 2;
|
||||
SET citus.shard_replication_factor to 1;
|
||||
select setval('pg_dist_shardid_seq', 1500000);
|
||||
|
||||
CREATE TABLE to_split_table (id int, value int);
|
||||
SELECT create_distributed_table('to_split_table', 'id');
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP TABLE to_split_table;
|
||||
}
|
||||
|
||||
session "s1"
|
||||
|
||||
step "s1-begin"
|
||||
{
|
||||
BEGIN;
|
||||
|
||||
-- the tests are written with the logic where single shard SELECTs
|
||||
-- do not to open transaction blocks
|
||||
SET citus.select_opens_transaction_block TO false;
|
||||
}
|
||||
|
||||
// cache all placements
|
||||
step "s1-load-cache"
|
||||
{
|
||||
-- Indirect way to load cache.
|
||||
TRUNCATE to_split_table;
|
||||
}
|
||||
|
||||
step "s1-insert"
|
||||
{
|
||||
-- Id '123456789' maps to shard 1500002.
|
||||
SELECT get_shard_id_for_distribution_column('to_split_table', 123456789);
|
||||
|
||||
INSERT INTO to_split_table VALUES (123456789, 1);
|
||||
}
|
||||
|
||||
step "s1-update"
|
||||
{
|
||||
UPDATE to_split_table SET value = 111 WHERE id = 123456789;
|
||||
}
|
||||
|
||||
step "s1-delete"
|
||||
{
|
||||
DELETE FROM to_split_table WHERE id = 123456789;
|
||||
}
|
||||
|
||||
step "s1-select"
|
||||
{
|
||||
SELECT count(*) FROM to_split_table WHERE id = 123456789;
|
||||
}
|
||||
|
||||
step "s1-ddl"
|
||||
{
|
||||
CREATE INDEX test_table_index ON to_split_table(id);
|
||||
}
|
||||
|
||||
step "s1-copy"
|
||||
{
|
||||
COPY to_split_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
|
||||
}
|
||||
|
||||
step "s1-blocking-shard-split"
|
||||
{
|
||||
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||
1500001,
|
||||
ARRAY['-1073741824'],
|
||||
ARRAY[1, 2],
|
||||
'blocking');
|
||||
}
|
||||
|
||||
step "s1-commit"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
session "s2"
|
||||
|
||||
step "s2-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "s2-blocking-shard-split"
|
||||
{
|
||||
SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||
1500002,
|
||||
ARRAY['1073741824'],
|
||||
ARRAY[1, 2],
|
||||
'blocking');
|
||||
}
|
||||
|
||||
step "s2-commit"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
step "s2-print-cluster"
|
||||
{
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('to_split_table', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
|
||||
-- rows
|
||||
SELECT id, value FROM to_split_table ORDER BY id, value;
|
||||
}
|
||||
|
||||
step "s2-print-index-count"
|
||||
{
|
||||
SELECT
|
||||
nodeport, success, result
|
||||
FROM
|
||||
run_command_on_placements('to_split_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
|
||||
ORDER BY
|
||||
nodeport;
|
||||
}
|
||||
|
||||
// Run shard split while concurrently performing DML and index creation
|
||||
// We expect DML,Copy to fail because the shard they are waiting for is destroyed.
|
||||
permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-update" "s2-commit" "s1-commit" "s2-print-cluster"
|
||||
permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-delete" "s2-commit" "s1-commit" "s2-print-cluster"
|
||||
permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-insert" "s2-commit" "s1-commit" "s2-print-cluster"
|
||||
permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-copy" "s2-commit" "s1-commit" "s2-print-cluster"
|
||||
// The same tests without loading the cache at first
|
||||
permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-update" "s2-commit" "s1-commit" "s2-print-cluster"
|
||||
permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-delete" "s2-commit" "s1-commit" "s2-print-cluster"
|
||||
permutation "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-insert" "s2-commit" "s1-commit" "s2-print-cluster"
|
||||
permutation "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-copy" "s2-commit" "s1-commit" "s2-print-cluster"
|
||||
|
||||
// Concurrent shard split blocks on different shards of the same table (or any colocated table)
|
||||
permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-blocking-shard-split" "s2-blocking-shard-split" "s1-commit" "s2-print-cluster"
|
||||
// The same test above without loading the cache at first
|
||||
permutation "s1-insert" "s1-begin" "s1-blocking-shard-split" "s2-blocking-shard-split" "s1-commit" "s2-print-cluster"
|
||||
|
||||
// Concurrent DDL blocks on different shards of the same table (or any colocated table)
|
||||
permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-ddl" "s2-commit" "s1-commit" "s2-print-cluster" "s2-print-index-count"
|
||||
// The same tests without loading the cache at first
|
||||
permutation "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-ddl" "s2-commit" "s1-commit" "s2-print-cluster" "s2-print-index-count"
|
Loading…
Reference in New Issue