Merge pull request #6066 from citusdata/fix_colocation_lock

Use colocation locks in lower level UDFs and create_distributed_table
pull/6089/head
Önder Kalacı 2022-07-27 10:10:25 +02:00 committed by GitHub
commit 14c56ce0dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 868 additions and 448 deletions

View File

@ -60,6 +60,7 @@
#include "distributed/relation_access_tracking.h" #include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/shared_library_init.h" #include "distributed/shared_library_init.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "distributed/worker_shard_visibility.h" #include "distributed/worker_shard_visibility.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
@ -850,6 +851,17 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
if (colocatedTableId != InvalidOid) if (colocatedTableId != InvalidOid)
{ {
/*
* We currently allow concurrent distribution of colocated tables (which
* we probably should not be allowing because of foreign keys /
* partitioning etc).
*
* We also prevent concurrent shard moves / copy / splits) while creating
* a colocated table.
*/
AcquirePlacementColocationLock(colocatedTableId, ShareLock,
"colocate distributed table");
CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection); CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection);
} }
else else

View File

@ -37,6 +37,7 @@
#include "distributed/reference_table_utils.h" #include "distributed/reference_table_utils.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
@ -179,6 +180,9 @@ citus_copy_shard_placement(PG_FUNCTION_ARGS)
ShardInterval *shardInterval = LoadShardInterval(shardId); ShardInterval *shardInterval = LoadShardInterval(shardId);
ErrorIfTableCannotBeReplicated(shardInterval->relationId); ErrorIfTableCannotBeReplicated(shardInterval->relationId);
AcquirePlacementColocationLock(shardInterval->relationId, ExclusiveLock,
doRepair ? "repair" : "copy");
if (doRepair) if (doRepair)
{ {
RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName, RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName,
@ -332,6 +336,8 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
ErrorIfMoveUnsupportedTableType(relationId); ErrorIfMoveUnsupportedTableType(relationId);
ErrorIfTargetNodeIsNotSafeToMove(targetNodeName, targetNodePort); ErrorIfTargetNodeIsNotSafeToMove(targetNodeName, targetNodePort);
AcquirePlacementColocationLock(relationId, ExclusiveLock, "move");
ShardInterval *shardInterval = LoadShardInterval(shardId); ShardInterval *shardInterval = LoadShardInterval(shardId);
Oid distributedTableId = shardInterval->relationId; Oid distributedTableId = shardInterval->relationId;

View File

@ -227,7 +227,7 @@ static float4 NodeCapacity(WorkerNode *workerNode, void *context);
static ShardCost GetShardCost(uint64 shardId, void *context); static ShardCost GetShardCost(uint64 shardId, void *context);
static List * NonColocatedDistRelationIdList(void); static List * NonColocatedDistRelationIdList(void);
static void RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid); static void RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid);
static void AcquireColocationLock(Oid relationId, const char *operationName); static void AcquireRebalanceColocationLock(Oid relationId, const char *operationName);
static void ExecutePlacementUpdates(List *placementUpdateList, Oid static void ExecutePlacementUpdates(List *placementUpdateList, Oid
shardReplicationModeOid, char *noticeOperation); shardReplicationModeOid, char *noticeOperation);
static float4 CalculateUtilization(float4 totalCost, float4 capacity); static float4 CalculateUtilization(float4 totalCost, float4 capacity);
@ -616,13 +616,13 @@ GetColocatedRebalanceSteps(List *placementUpdateList)
/* /*
* AcquireColocationLock tries to acquire a lock for rebalance/replication. If * AcquireRelationColocationLock tries to acquire a lock for
* this is it not possible it fails instantly because this means another * rebalance/replication. If this is it not possible it fails
* rebalance/replication is currently happening. This would really mess up * instantly because this means another rebalance/replication
* planning. * is currently happening. This would really mess up planning.
*/ */
static void static void
AcquireColocationLock(Oid relationId, const char *operationName) AcquireRebalanceColocationLock(Oid relationId, const char *operationName)
{ {
uint32 lockId = relationId; uint32 lockId = relationId;
LOCKTAG tag; LOCKTAG tag;
@ -639,8 +639,48 @@ AcquireColocationLock(Oid relationId, const char *operationName)
if (!lockAcquired) if (!lockAcquired)
{ {
ereport(ERROR, (errmsg("could not acquire the lock required to %s %s", ereport(ERROR, (errmsg("could not acquire the lock required to %s %s",
operationName, generate_qualified_relation_name( operationName,
relationId)))); generate_qualified_relation_name(relationId)),
errdetail("It means that either a concurrent shard move "
"or shard copy is happening."),
errhint("Make sure that the concurrent operation has "
"finished and re-run the command")));
}
}
/*
* AcquirePlacementColocationLock tries to acquire a lock for
* rebalance/replication while moving/copying the placement. If this
* is it not possible it fails instantly because this means
* another move/copy is currently happening. This would really mess up planning.
*/
void
AcquirePlacementColocationLock(Oid relationId, int lockMode,
const char *operationName)
{
uint32 lockId = relationId;
LOCKTAG tag;
CitusTableCacheEntry *citusTableCacheEntry = GetCitusTableCacheEntry(relationId);
if (citusTableCacheEntry->colocationId != INVALID_COLOCATION_ID)
{
lockId = citusTableCacheEntry->colocationId;
}
SET_LOCKTAG_REBALANCE_PLACEMENT_COLOCATION(tag, (int64) lockId);
LockAcquireResult lockAcquired = LockAcquire(&tag, lockMode, false, true);
if (!lockAcquired)
{
ereport(ERROR, (errmsg("could not acquire the lock required to %s %s",
operationName,
generate_qualified_relation_name(relationId)),
errdetail("It means that either a concurrent shard move "
"or colocated distributed table creation is "
"happening."),
errhint("Make sure that the concurrent operation has "
"finished and re-run the command")));
} }
} }
@ -942,7 +982,7 @@ replicate_table_shards(PG_FUNCTION_ARGS)
char transferMode = LookupShardTransferMode(shardReplicationModeOid); char transferMode = LookupShardTransferMode(shardReplicationModeOid);
EnsureReferenceTablesExistOnAllNodesExtended(transferMode); EnsureReferenceTablesExistOnAllNodesExtended(transferMode);
AcquireColocationLock(relationId, "replicate"); AcquireRebalanceColocationLock(relationId, "replicate");
List *activeWorkerList = SortedActiveWorkers(); List *activeWorkerList = SortedActiveWorkers();
List *shardPlacementList = FullShardPlacementList(relationId, excludedShardArray); List *shardPlacementList = FullShardPlacementList(relationId, excludedShardArray);
@ -1555,7 +1595,7 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)
foreach_oid(relationId, options->relationIdList) foreach_oid(relationId, options->relationIdList)
{ {
AcquireColocationLock(relationId, operationName); AcquireRebalanceColocationLock(relationId, operationName);
} }
List *placementUpdateList = GetRebalanceSteps(options); List *placementUpdateList = GetRebalanceSteps(options);

View File

@ -35,6 +35,7 @@
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/deparse_shard_query.h" #include "distributed/deparse_shard_query.h"
#include "distributed/shard_rebalancer.h"
/* /*
* Entry for map that tracks ShardInterval -> Placement Node * Entry for map that tracks ShardInterval -> Placement Node
@ -329,6 +330,9 @@ SplitShard(SplitMode splitMode,
ShardInterval *shardIntervalToSplit = LoadShardInterval(shardIdToSplit); ShardInterval *shardIntervalToSplit = LoadShardInterval(shardIdToSplit);
List *colocatedTableList = ColocatedTableList(shardIntervalToSplit->relationId); List *colocatedTableList = ColocatedTableList(shardIntervalToSplit->relationId);
Oid relationId = RelationIdForShard(shardIdToSplit);
AcquirePlacementColocationLock(relationId, ExclusiveLock, "split");
/* sort the tables to avoid deadlocks */ /* sort the tables to avoid deadlocks */
colocatedTableList = SortList(colocatedTableList, CompareOids); colocatedTableList = SortList(colocatedTableList, CompareOids);
Oid colocatedTableId = InvalidOid; Oid colocatedTableId = InvalidOid;

View File

@ -41,7 +41,8 @@ typedef enum AdvisoryLocktagClass
ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8, ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8,
ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9, ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9,
ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP = 10, ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP = 10,
ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12 ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12,
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION = 13
} AdvisoryLocktagClass; } AdvisoryLocktagClass;
/* CitusOperations has constants for citus operations */ /* CitusOperations has constants for citus operations */
@ -84,6 +85,17 @@ typedef enum CitusOperations
(uint32) (colocationOrTableId), \ (uint32) (colocationOrTableId), \
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION) ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION)
/* reuse advisory lock, but with different, unused field 4 (13)
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks
* are local to each database */
#define SET_LOCKTAG_REBALANCE_PLACEMENT_COLOCATION(tag, colocationOrTableId) \
SET_LOCKTAG_ADVISORY(tag, \
MyDatabaseId, \
(uint32) ((colocationOrTableId) >> 32), \
(uint32) (colocationOrTableId), \
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION)
/* advisory lock for citus operations, also it has the database hardcoded to MyDatabaseId, /* advisory lock for citus operations, also it has the database hardcoded to MyDatabaseId,
* to ensure the locks are local to each database */ * to ensure the locks are local to each database */
#define SET_LOCKTAG_CITUS_OPERATION(tag, operationId) \ #define SET_LOCKTAG_CITUS_OPERATION(tag, operationId) \

View File

@ -194,6 +194,7 @@ extern List * RebalancePlacementUpdates(List *workerNodeList,
extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList, extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList,
int shardReplicationFactor); int shardReplicationFactor);
extern void ExecuteRebalancerCommandInSeparateTransaction(char *command); extern void ExecuteRebalancerCommandInSeparateTransaction(char *command);
extern void AcquirePlacementColocationLock(Oid relationId, int lockMode,
const char *operationName);
#endif /* SHARD_REBALANCER_H */ #endif /* SHARD_REBALANCER_H */

View File

@ -670,16 +670,11 @@ step s2-blocking-shard-split:
ARRAY['1073741824'], ARRAY['1073741824'],
ARRAY[1, 2], ARRAY[1, 2],
'block_writes'); 'block_writes');
<waiting ...>
ERROR: could not acquire the lock required to split public.to_split_table
step s1-commit: step s1-commit:
COMMIT; COMMIT;
step s2-blocking-shard-split: <... completed>
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
step s2-print-cluster: step s2-print-cluster:
-- row count per shard -- row count per shard
SELECT SELECT
@ -694,10 +689,9 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500003|t | 0 57637|1500003|t | 0
57637|1500005|t | 1 57638|1500002|t | 1
57638|1500004|t | 0 57638|1500004|t | 0
57638|1500006|t | 0 (3 rows)
(4 rows)
id|value id|value
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -745,16 +739,11 @@ step s2-blocking-shard-split:
ARRAY['1073741824'], ARRAY['1073741824'],
ARRAY[1, 2], ARRAY[1, 2],
'block_writes'); 'block_writes');
<waiting ...>
ERROR: could not acquire the lock required to split public.to_split_table
step s1-commit: step s1-commit:
COMMIT; COMMIT;
step s2-blocking-shard-split: <... completed>
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
step s2-print-cluster: step s2-print-cluster:
-- row count per shard -- row count per shard
SELECT SELECT
@ -769,10 +758,9 @@ step s2-print-cluster:
nodeport|shardid|success|result nodeport|shardid|success|result
--------------------------------------------------------------------- ---------------------------------------------------------------------
57637|1500003|t | 0 57637|1500003|t | 0
57637|1500005|t | 1 57638|1500002|t | 1
57638|1500004|t | 0 57638|1500004|t | 0
57638|1500006|t | 0 (3 rows)
(4 rows)
id|value id|value
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -0,0 +1,233 @@
Parsed test spec with 5 sessions
starting permutation: s2-begin s2-create_distributed_table s3-create_distributed_table s2-commit
step s2-begin:
BEGIN;
step s2-create_distributed_table:
SELECT create_distributed_table('concurrent_table_2', 'id', colocate_with := 'concurrent_table_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s3-create_distributed_table:
SELECT create_distributed_table('concurrent_table_3', 'id', colocate_with := 'concurrent_table_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s2-commit:
COMMIT;
starting permutation: s2-begin s2-create_distributed_table s1-move-shard-logical s2-commit s3-sanity-check s3-sanity-check-2
step s2-begin:
BEGIN;
step s2-create_distributed_table:
SELECT create_distributed_table('concurrent_table_2', 'id', colocate_with := 'concurrent_table_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s1-move-shard-logical:
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1)
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638) FROM shardid;
ERROR: could not acquire the lock required to move public.concurrent_table_1
step s2-commit:
COMMIT;
step s3-sanity-check:
SELECT count(*) FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING(shardid) WHERE nodename IS NULL;
count
---------------------------------------------------------------------
0
(1 row)
step s3-sanity-check-2:
SELECT count(*) FROM concurrent_table_1 JOIN concurrent_table_2 USING (id);
count
---------------------------------------------------------------------
0
(1 row)
starting permutation: s2-begin s2-create_distributed_table s1-move-shard-block s2-commit s3-sanity-check s3-sanity-check-2
step s2-begin:
BEGIN;
step s2-create_distributed_table:
SELECT create_distributed_table('concurrent_table_2', 'id', colocate_with := 'concurrent_table_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s1-move-shard-block:
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1)
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638, 'block_writes') FROM shardid;
ERROR: could not acquire the lock required to move public.concurrent_table_1
step s2-commit:
COMMIT;
step s3-sanity-check:
SELECT count(*) FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING(shardid) WHERE nodename IS NULL;
count
---------------------------------------------------------------------
0
(1 row)
step s3-sanity-check-2:
SELECT count(*) FROM concurrent_table_1 JOIN concurrent_table_2 USING (id);
count
---------------------------------------------------------------------
0
(1 row)
starting permutation: s2-begin s2-create_distributed_table s1-split-block s2-commit s3-sanity-check s3-sanity-check-2
step s2-begin:
BEGIN;
step s2-create_distributed_table:
SELECT create_distributed_table('concurrent_table_2', 'id', colocate_with := 'concurrent_table_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s1-split-block:
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1)
SELECT citus_split_shard_by_split_points(
shardid.shardid, ARRAY['2113265921'], ARRAY[(SELECT * FROM first_node_id), (SELECT * FROM first_node_id)], 'block_writes') FROM shardid;
ERROR: could not acquire the lock required to split public.concurrent_table_1
step s2-commit:
COMMIT;
step s3-sanity-check:
SELECT count(*) FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING(shardid) WHERE nodename IS NULL;
count
---------------------------------------------------------------------
0
(1 row)
step s3-sanity-check-2:
SELECT count(*) FROM concurrent_table_1 JOIN concurrent_table_2 USING (id);
count
---------------------------------------------------------------------
0
(1 row)
starting permutation: s4-begin s4-move-shard-logical s5-setup-rep-factor s5-create_implicit_colocated_distributed_table s4-commit s3-sanity-check s3-sanity-check-3 s3-sanity-check-4
step s4-begin:
BEGIN;
step s4-move-shard-logical:
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_4'::regclass ORDER BY shardid LIMIT 1)
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638) FROM shardid;
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
step s5-setup-rep-factor:
SET citus.shard_replication_factor TO 1;
step s5-create_implicit_colocated_distributed_table:
SELECT create_distributed_table('concurrent_table_5', 'id');
ERROR: could not acquire the lock required to colocate distributed table public.concurrent_table_4
step s4-commit:
commit;
step s3-sanity-check:
SELECT count(*) FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING(shardid) WHERE nodename IS NULL;
count
---------------------------------------------------------------------
0
(1 row)
step s3-sanity-check-3:
SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('concurrent_table_4', 'concurrent_table_5');
count
---------------------------------------------------------------------
1
(1 row)
step s3-sanity-check-4:
SELECT count(*) FROM concurrent_table_4 JOIN concurrent_table_5 USING (id);
count
---------------------------------------------------------------------
0
(1 row)
starting permutation: s4-begin s4-move-shard-block s5-setup-rep-factor s5-create_implicit_colocated_distributed_table s4-commit s3-sanity-check s3-sanity-check-3 s3-sanity-check-4
step s4-begin:
BEGIN;
step s4-move-shard-block:
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_4'::regclass ORDER BY shardid LIMIT 1)
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638, 'block_writes') FROM shardid;
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
step s5-setup-rep-factor:
SET citus.shard_replication_factor TO 1;
step s5-create_implicit_colocated_distributed_table:
SELECT create_distributed_table('concurrent_table_5', 'id');
ERROR: could not acquire the lock required to colocate distributed table public.concurrent_table_4
step s4-commit:
commit;
step s3-sanity-check:
SELECT count(*) FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING(shardid) WHERE nodename IS NULL;
count
---------------------------------------------------------------------
0
(1 row)
step s3-sanity-check-3:
SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('concurrent_table_4', 'concurrent_table_5');
count
---------------------------------------------------------------------
1
(1 row)
step s3-sanity-check-4:
SELECT count(*) FROM concurrent_table_4 JOIN concurrent_table_5 USING (id);
count
---------------------------------------------------------------------
0
(1 row)

View File

@ -23,12 +23,11 @@ master_copy_shard_placement
step s1-repair-placement: step s1-repair-placement:
SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638); SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638);
<waiting ...>
ERROR: could not acquire the lock required to repair public.test_hash_table
step s2-commit: step s2-commit:
COMMIT; COMMIT;
step s1-repair-placement: <... completed>
ERROR: target placement must be in inactive state
starting permutation: s2-set-placement-inactive s2-begin s2-repair-placement s1-repair-placement s2-commit starting permutation: s2-set-placement-inactive s2-begin s2-repair-placement s1-repair-placement s2-commit
step s2-set-placement-inactive: step s2-set-placement-inactive:
@ -47,9 +46,8 @@ master_copy_shard_placement
step s1-repair-placement: step s1-repair-placement:
SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638); SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638);
<waiting ...>
ERROR: could not acquire the lock required to repair public.test_hash_table
step s2-commit: step s2-commit:
COMMIT; COMMIT;
step s1-repair-placement: <... completed>
ERROR: target placement must be in inactive state

View File

@ -594,16 +594,11 @@ step s2-move-placement:
SELECT master_move_shard_placement( SELECT master_move_shard_placement(
get_shard_id_for_distribution_column('logical_replicate_placement', 4), get_shard_id_for_distribution_column('logical_replicate_placement', 4),
'localhost', 57637, 'localhost', 57638); 'localhost', 57637, 'localhost', 57638);
<waiting ...>
ERROR: could not acquire the lock required to move public.logical_replicate_placement
step s1-end: step s1-end:
COMMIT; COMMIT;
step s2-move-placement: <... completed>
master_move_shard_placement
---------------------------------------------------------------------
(1 row)
step s2-end: step s2-end:
COMMIT; COMMIT;

View File

@ -47,9 +47,3 @@ count
0 0
(1 row) (1 row)
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,t)
(localhost,57638,t,t)
(2 rows)

View File

@ -17,12 +17,11 @@ master_move_shard_placement
step s1-move-placement: step s1-move-placement:
SELECT master_move_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638, 'force_logical'); SELECT master_move_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638, 'force_logical');
<waiting ...>
ERROR: could not acquire the lock required to move public.test_move_table
step s2-commit: step s2-commit:
COMMIT; COMMIT;
step s1-move-placement: <... completed>
ERROR: source placement must be in active state
step s2-print-placements: step s2-print-placements:
SELECT SELECT
nodename, nodeport, count(*) nodename, nodeport, count(*)
@ -55,12 +54,11 @@ master_move_shard_placement
step s1-move-placement: step s1-move-placement:
SELECT master_move_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638, 'force_logical'); SELECT master_move_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638, 'force_logical');
<waiting ...>
ERROR: could not acquire the lock required to move public.test_move_table
step s2-commit: step s2-commit:
COMMIT; COMMIT;
step s1-move-placement: <... completed>
ERROR: source placement must be in active state
step s2-print-placements: step s2-print-placements:
SELECT SELECT
nodename, nodeport, count(*) nodename, nodeport, count(*)

View File

@ -97,5 +97,6 @@ test: isolation_replicated_dist_on_mx
test: isolation_replicate_reference_tables_to_coordinator test: isolation_replicate_reference_tables_to_coordinator
test: isolation_multiuser_locking test: isolation_multiuser_locking
test: isolation_acquire_distributed_locks test: isolation_acquire_distributed_locks
test: isolation_concurrent_move_create_table
test: isolation_check_mx test: isolation_check_mx

View File

@ -0,0 +1,137 @@
setup
{
CREATE TABLE concurrent_table_1(id int PRIMARY KEY);
CREATE TABLE concurrent_table_2(id int PRIMARY KEY);
CREATE TABLE concurrent_table_3(id int PRIMARY KEY);
CREATE TABLE concurrent_table_4(id int PRIMARY KEY);
CREATE TABLE concurrent_table_5(id int PRIMARY KEY);
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('concurrent_table_1', 'id', colocate_with := 'none');
SELECT create_distributed_table('concurrent_table_4', 'id');
SELECT nodeid INTO first_node_id FROM pg_dist_node WHERE nodeport = 57637;
}
teardown
{
DROP TABLE concurrent_table_1, concurrent_table_2, concurrent_table_3, concurrent_table_4, concurrent_table_5, first_node_id CASCADE;
}
session "s1"
step "s1-move-shard-logical"
{
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1)
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638) FROM shardid;
}
step "s1-move-shard-block"
{
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1)
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638, 'block_writes') FROM shardid;
}
step "s1-split-block"
{
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1)
SELECT citus_split_shard_by_split_points(
shardid.shardid, ARRAY['2113265921'], ARRAY[(SELECT * FROM first_node_id), (SELECT * FROM first_node_id)], 'block_writes') FROM shardid;
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-create_distributed_table"
{
SELECT create_distributed_table('concurrent_table_2', 'id', colocate_with := 'concurrent_table_1');
}
step "s2-commit"
{
COMMIT;
}
session "s3"
step "s3-create_distributed_table"
{
SELECT create_distributed_table('concurrent_table_3', 'id', colocate_with := 'concurrent_table_1');
}
step "s3-sanity-check"
{
SELECT count(*) FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING(shardid) WHERE nodename IS NULL;
}
step "s3-sanity-check-2"
{
SELECT count(*) FROM concurrent_table_1 JOIN concurrent_table_2 USING (id);
}
step "s3-sanity-check-3"
{
SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('concurrent_table_4', 'concurrent_table_5');
}
step "s3-sanity-check-4"
{
SELECT count(*) FROM concurrent_table_4 JOIN concurrent_table_5 USING (id);
}
session "s4"
step "s4-begin"
{
BEGIN;
}
step "s4-commit"
{
commit;
}
step "s4-move-shard-logical"
{
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_4'::regclass ORDER BY shardid LIMIT 1)
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638) FROM shardid;
}
step "s4-move-shard-block"
{
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_4'::regclass ORDER BY shardid LIMIT 1)
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638, 'block_writes') FROM shardid;
}
session "s5"
step "s5-setup-rep-factor"
{
SET citus.shard_replication_factor TO 1;
}
step "s5-create_implicit_colocated_distributed_table"
{
SELECT create_distributed_table('concurrent_table_5', 'id');
}
//concurrent create_distributed_table with the same colocation should not block each other
permutation "s2-begin" "s2-create_distributed_table" "s3-create_distributed_table" "s2-commit"
// concurrent create colocated table and shard move properly block each other, and cluster is healthy
permutation "s2-begin" "s2-create_distributed_table" "s1-move-shard-logical" "s2-commit" "s3-sanity-check" "s3-sanity-check-2"
permutation "s2-begin" "s2-create_distributed_table" "s1-move-shard-block" "s2-commit" "s3-sanity-check" "s3-sanity-check-2"
permutation "s2-begin" "s2-create_distributed_table" "s1-split-block" "s2-commit" "s3-sanity-check" "s3-sanity-check-2"
// same test above, but this time implicitly colocated tables
permutation "s4-begin" "s4-move-shard-logical" "s5-setup-rep-factor" "s5-create_implicit_colocated_distributed_table" "s4-commit" "s3-sanity-check" "s3-sanity-check-3" "s3-sanity-check-4"
permutation "s4-begin" "s4-move-shard-block" "s5-setup-rep-factor" "s5-create_implicit_colocated_distributed_table" "s4-commit" "s3-sanity-check" "s3-sanity-check-3" "s3-sanity-check-4"

View File

@ -26,6 +26,7 @@ teardown
{ {
SELECT run_command_on_workers('ALTER SYSTEM RESET citus.max_client_connections'); SELECT run_command_on_workers('ALTER SYSTEM RESET citus.max_client_connections');
SELECT run_command_on_workers('SELECT pg_reload_conf()'); SELECT run_command_on_workers('SELECT pg_reload_conf()');
DROP TABLE my_table;
} }
session "s1" session "s1"