Add colocation checks for shard splits

pull/6066/head
Onder Kalaci 2022-07-21 13:13:02 +02:00
parent 12fa3aaf6b
commit 5bc8a81aa7
5 changed files with 349 additions and 333 deletions

View File

@ -35,6 +35,7 @@
#include "distributed/metadata_sync.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/shard_rebalancer.h"
/*
* Entry for map that tracks ShardInterval -> Placement Node
@ -329,6 +330,9 @@ SplitShard(SplitMode splitMode,
ShardInterval *shardIntervalToSplit = LoadShardInterval(shardIdToSplit);
List *colocatedTableList = ColocatedTableList(shardIntervalToSplit->relationId);
Oid relationId = RelationIdForShard(shardIdToSplit);
AcquirePlacementColocationLock(relationId, ExclusiveLock, "split");
/* sort the tables to avoid deadlocks */
colocatedTableList = SortList(colocatedTableList, CompareOids);
Oid colocatedTableId = InvalidOid;

View File

@ -670,16 +670,11 @@ step s2-blocking-shard-split:
ARRAY['1073741824'],
ARRAY[1, 2],
'block_writes');
<waiting ...>
ERROR: could not acquire the lock required to split public.to_split_table
step s1-commit:
COMMIT;
step s2-blocking-shard-split: <... completed>
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
step s2-print-cluster:
-- row count per shard
SELECT
@ -694,10 +689,9 @@ step s2-print-cluster:
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500003|t | 0
57637|1500005|t | 1
57638|1500002|t | 1
57638|1500004|t | 0
57638|1500006|t | 0
(4 rows)
(3 rows)
id|value
---------------------------------------------------------------------
@ -745,16 +739,11 @@ step s2-blocking-shard-split:
ARRAY['1073741824'],
ARRAY[1, 2],
'block_writes');
<waiting ...>
ERROR: could not acquire the lock required to split public.to_split_table
step s1-commit:
COMMIT;
step s2-blocking-shard-split: <... completed>
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
step s2-print-cluster:
-- row count per shard
SELECT
@ -769,10 +758,9 @@ step s2-print-cluster:
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500003|t | 0
57637|1500005|t | 1
57638|1500002|t | 1
57638|1500004|t | 0
57638|1500006|t | 0
(4 rows)
(3 rows)
id|value
---------------------------------------------------------------------

View File

@ -1,11 +1,6 @@
Parsed test spec with 5 sessions
starting permutation: s2-begin s2-create_distributed_table s3-create_distributed_table s2-commit
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
@ -30,11 +25,6 @@ step s2-commit:
starting permutation: s2-begin s2-create_distributed_table s1-move-shard-logical s2-commit s3-sanity-check s3-sanity-check-2
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
@ -72,11 +62,6 @@ count
starting permutation: s2-begin s2-create_distributed_table s1-move-shard-block s2-commit s3-sanity-check s3-sanity-check-2
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
@ -113,12 +98,45 @@ count
(1 row)
starting permutation: s4-begin s4-move-shard-logical s5-setup-rep-factor s5-create_implicit_colocated_distributed_table s4-commit s3-sanity-check s3-sanity-check-3 s3-sanity-check-4
starting permutation: s2-begin s2-create_distributed_table s1-split-block s2-commit s3-sanity-check s3-sanity-check-2
step s2-begin:
BEGIN;
step s2-create_distributed_table:
SELECT create_distributed_table('concurrent_table_2', 'id', colocate_with := 'concurrent_table_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s1-split-block:
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1)
SELECT citus_split_shard_by_split_points(
shardid.shardid, ARRAY['2113265921'], ARRAY[(SELECT * FROM first_node_id), (SELECT * FROM first_node_id)], 'block_writes') FROM shardid;
ERROR: could not acquire the lock required to split public.concurrent_table_1
step s2-commit:
COMMIT;
step s3-sanity-check:
SELECT count(*) FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING(shardid) WHERE nodename IS NULL;
count
---------------------------------------------------------------------
0
(1 row)
step s3-sanity-check-2:
SELECT count(*) FROM concurrent_table_1 JOIN concurrent_table_2 USING (id);
count
---------------------------------------------------------------------
0
(1 row)
starting permutation: s4-begin s4-move-shard-logical s5-setup-rep-factor s5-create_implicit_colocated_distributed_table s4-commit s3-sanity-check s3-sanity-check-3 s3-sanity-check-4
step s4-begin:
BEGIN;
@ -167,11 +185,6 @@ count
starting permutation: s4-begin s4-move-shard-block s5-setup-rep-factor s5-create_implicit_colocated_distributed_table s4-commit s3-sanity-check s3-sanity-check-3 s3-sanity-check-4
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s4-begin:
BEGIN;

View File

@ -125,22 +125,22 @@ step "s2-print-index-count"
// Run shard split while concurrently performing DML and index creation
// We expect DML,Copy to fail because the shard they are waiting for is destroyed.
permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-update" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-delete" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-insert" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-copy" "s2-commit" "s1-commit" "s2-print-cluster"
// The same tests without loading the cache at first
permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-update" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-delete" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-insert" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-copy" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-update" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-delete" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-insert" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-copy" "s2-commit" "s1-commit" "s2-print-cluster"
// The same tests without loading the cache at first
permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-update" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-insert" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-delete" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-insert" "s2-commit" "s1-commit" "s2-print-cluster"
permutation "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-copy" "s2-commit" "s1-commit" "s2-print-cluster"
// Concurrent shard split blocks on different shards of the same table (or any colocated table)
permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-blocking-shard-split" "s2-blocking-shard-split" "s1-commit" "s2-print-cluster"
// The same test above without loading the cache at first
permutation "s1-insert" "s1-begin" "s1-blocking-shard-split" "s2-blocking-shard-split" "s1-commit" "s2-print-cluster"
permutation "s1-load-cache" "s1-insert" "s1-begin" "s1-blocking-shard-split" "s2-blocking-shard-split" "s1-commit" "s2-print-cluster"
// The same test above without loading the cache at first
permutation "s1-insert" "s1-begin" "s1-blocking-shard-split" "s2-blocking-shard-split" "s1-commit" "s2-print-cluster"
// Concurrent DDL blocks on different shards of the same table (or any colocated table)
permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-ddl" "s2-commit" "s1-commit" "s2-print-cluster" "s2-print-index-count"
// The same tests without loading the cache at first
permutation "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-ddl" "s2-commit" "s1-commit" "s2-print-cluster" "s2-print-index-count"
permutation "s1-load-cache" "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-ddl" "s2-commit" "s1-commit" "s2-print-cluster" "s2-print-index-count"
// The same tests without loading the cache at first
permutation "s1-begin" "s1-select" "s2-begin" "s2-blocking-shard-split" "s1-ddl" "s2-commit" "s1-commit" "s2-print-cluster" "s2-print-index-count"

View File

@ -9,11 +9,13 @@ setup
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('concurrent_table_1', 'id', colocate_with := 'none');
SELECT create_distributed_table('concurrent_table_4', 'id');
SELECT nodeid INTO first_node_id FROM pg_dist_node WHERE nodeport = 57637;
}
teardown
{
DROP TABLE concurrent_table_1, concurrent_table_2, concurrent_table_3, concurrent_table_4, concurrent_table_5 CASCADE;
DROP TABLE concurrent_table_1, concurrent_table_2, concurrent_table_3, concurrent_table_4, concurrent_table_5, first_node_id CASCADE;
}
session "s1"
@ -31,6 +33,14 @@ step "s1-move-shard-block"
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638, 'block_writes') FROM shardid;
}
step "s1-split-block"
{
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1)
SELECT citus_split_shard_by_split_points(
shardid.shardid, ARRAY['2113265921'], ARRAY[(SELECT * FROM first_node_id), (SELECT * FROM first_node_id)], 'block_writes') FROM shardid;
}
session "s2"
step "s2-begin"
@ -119,6 +129,7 @@ permutation "s2-begin" "s2-create_distributed_table" "s3-create_distributed_tab
// concurrent create colocated table and shard move properly block each other, and cluster is healthy
permutation "s2-begin" "s2-create_distributed_table" "s1-move-shard-logical" "s2-commit" "s3-sanity-check" "s3-sanity-check-2"
permutation "s2-begin" "s2-create_distributed_table" "s1-move-shard-block" "s2-commit" "s3-sanity-check" "s3-sanity-check-2"
permutation "s2-begin" "s2-create_distributed_table" "s1-split-block" "s2-commit" "s3-sanity-check" "s3-sanity-check-2"
// same test above, but this time implicitly colocated tables
permutation "s4-begin" "s4-move-shard-logical" "s5-setup-rep-factor" "s5-create_implicit_colocated_distributed_table" "s4-commit" "s3-sanity-check" "s3-sanity-check-3" "s3-sanity-check-4"