mirror of https://github.com/citusdata/citus.git
Concurrent shard move/copy and colocated table creation fix
It turns out that create_distributed_table
and citus_move/copy_shard_placement does not
work well concurrently.
To fix that, we need to acquire a lock, which
sounds like a good use of colocation lock.
However, the current usage of colocation lock is
limited to higher level UDFs like rebalance_table_shards
etc. Those usage of lock is still useful, but
we cannot acquire the same lock on citus_move_shard_placement
etc. because the coordinator connects to itself to acquire
the lock. Hence, the high level UDF blocks itself.
To fix that, we use one more colocation lock, with the placements
are the main objects to consider.
(cherry picked from commit 12fa3aaf6b
)
release-11-onder-27-july
parent
a21a4e128c
commit
9af736c7a6
|
@ -60,6 +60,7 @@
|
|||
#include "distributed/relation_access_tracking.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/shared_library_init.h"
|
||||
#include "distributed/shard_rebalancer.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "distributed/worker_shard_visibility.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
|
@ -851,6 +852,17 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
|
|||
|
||||
if (colocatedTableId != InvalidOid)
|
||||
{
|
||||
/*
|
||||
* We currently allow concurrent distribution of colocated tables (which
|
||||
* we probably should not be allowing because of foreign keys /
|
||||
* partitioning etc).
|
||||
*
|
||||
* We also prevent concurrent shard moves / copy / splits) while creating
|
||||
* a colocated table.
|
||||
*/
|
||||
AcquirePlacementColocationLock(colocatedTableId, ShareLock,
|
||||
"colocate distributed table");
|
||||
|
||||
CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection);
|
||||
}
|
||||
else
|
||||
|
|
|
@ -37,6 +37,7 @@
|
|||
#include "distributed/reference_table_utils.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/shard_rebalancer.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
|
@ -179,6 +180,9 @@ citus_copy_shard_placement(PG_FUNCTION_ARGS)
|
|||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||
ErrorIfTableCannotBeReplicated(shardInterval->relationId);
|
||||
|
||||
AcquirePlacementColocationLock(shardInterval->relationId, ExclusiveLock,
|
||||
doRepair ? "repair" : "copy");
|
||||
|
||||
if (doRepair)
|
||||
{
|
||||
RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName,
|
||||
|
@ -332,6 +336,8 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
|
|||
ErrorIfMoveUnsupportedTableType(relationId);
|
||||
ErrorIfTargetNodeIsNotSafeToMove(targetNodeName, targetNodePort);
|
||||
|
||||
AcquirePlacementColocationLock(relationId, ExclusiveLock, "move");
|
||||
|
||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||
Oid distributedTableId = shardInterval->relationId;
|
||||
|
||||
|
|
|
@ -230,7 +230,7 @@ static float4 NodeCapacity(WorkerNode *workerNode, void *context);
|
|||
static ShardCost GetShardCost(uint64 shardId, void *context);
|
||||
static List * NonColocatedDistRelationIdList(void);
|
||||
static void RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid);
|
||||
static void AcquireColocationLock(Oid relationId, const char *operationName);
|
||||
static void AcquireRebalanceColocationLock(Oid relationId, const char *operationName);
|
||||
static void ExecutePlacementUpdates(List *placementUpdateList, Oid
|
||||
shardReplicationModeOid, char *noticeOperation);
|
||||
static float4 CalculateUtilization(float4 totalCost, float4 capacity);
|
||||
|
@ -619,13 +619,13 @@ GetColocatedRebalanceSteps(List *placementUpdateList)
|
|||
|
||||
|
||||
/*
|
||||
* AcquireColocationLock tries to acquire a lock for rebalance/replication. If
|
||||
* this is it not possible it fails instantly because this means another
|
||||
* rebalance/replication is currently happening. This would really mess up
|
||||
* planning.
|
||||
* AcquireRelationColocationLock tries to acquire a lock for
|
||||
* rebalance/replication. If this is it not possible it fails
|
||||
* instantly because this means another rebalance/replication
|
||||
* is currently happening. This would really mess up planning.
|
||||
*/
|
||||
static void
|
||||
AcquireColocationLock(Oid relationId, const char *operationName)
|
||||
AcquireRebalanceColocationLock(Oid relationId, const char *operationName)
|
||||
{
|
||||
uint32 lockId = relationId;
|
||||
LOCKTAG tag;
|
||||
|
@ -642,8 +642,48 @@ AcquireColocationLock(Oid relationId, const char *operationName)
|
|||
if (!lockAcquired)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not acquire the lock required to %s %s",
|
||||
operationName, generate_qualified_relation_name(
|
||||
relationId))));
|
||||
operationName,
|
||||
generate_qualified_relation_name(relationId)),
|
||||
errdetail("It means that either a concurrent shard move "
|
||||
"or shard copy is happening."),
|
||||
errhint("Make sure that the concurrent operation has "
|
||||
"finished and re-run the command")));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AcquirePlacementColocationLock tries to acquire a lock for
|
||||
* rebalance/replication while moving/copying the placement. If this
|
||||
* is it not possible it fails instantly because this means
|
||||
* another move/copy is currently happening. This would really mess up planning.
|
||||
*/
|
||||
void
|
||||
AcquirePlacementColocationLock(Oid relationId, int lockMode,
|
||||
const char *operationName)
|
||||
{
|
||||
uint32 lockId = relationId;
|
||||
LOCKTAG tag;
|
||||
|
||||
CitusTableCacheEntry *citusTableCacheEntry = GetCitusTableCacheEntry(relationId);
|
||||
if (citusTableCacheEntry->colocationId != INVALID_COLOCATION_ID)
|
||||
{
|
||||
lockId = citusTableCacheEntry->colocationId;
|
||||
}
|
||||
|
||||
SET_LOCKTAG_REBALANCE_PLACEMENT_COLOCATION(tag, (int64) lockId);
|
||||
|
||||
LockAcquireResult lockAcquired = LockAcquire(&tag, lockMode, false, true);
|
||||
if (!lockAcquired)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not acquire the lock required to %s %s",
|
||||
operationName,
|
||||
generate_qualified_relation_name(relationId)),
|
||||
errdetail("It means that either a concurrent shard move "
|
||||
"or colocated distributed table creation is "
|
||||
"happening."),
|
||||
errhint("Make sure that the concurrent operation has "
|
||||
"finished and re-run the command")));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -945,7 +985,7 @@ replicate_table_shards(PG_FUNCTION_ARGS)
|
|||
char transferMode = LookupShardTransferMode(shardReplicationModeOid);
|
||||
EnsureReferenceTablesExistOnAllNodesExtended(transferMode);
|
||||
|
||||
AcquireColocationLock(relationId, "replicate");
|
||||
AcquireRebalanceColocationLock(relationId, "replicate");
|
||||
|
||||
List *activeWorkerList = SortedActiveWorkers();
|
||||
List *shardPlacementList = FullShardPlacementList(relationId, excludedShardArray);
|
||||
|
@ -1558,7 +1598,7 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)
|
|||
|
||||
foreach_oid(relationId, options->relationIdList)
|
||||
{
|
||||
AcquireColocationLock(relationId, operationName);
|
||||
AcquireRebalanceColocationLock(relationId, operationName);
|
||||
}
|
||||
|
||||
List *placementUpdateList = GetRebalanceSteps(options);
|
||||
|
|
|
@ -41,7 +41,8 @@ typedef enum AdvisoryLocktagClass
|
|||
ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8,
|
||||
ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9,
|
||||
ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP = 10,
|
||||
ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12
|
||||
ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12,
|
||||
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION = 13
|
||||
} AdvisoryLocktagClass;
|
||||
|
||||
/* CitusOperations has constants for citus operations */
|
||||
|
@ -92,6 +93,17 @@ typedef enum CitusOperations
|
|||
(uint32) (colocationOrTableId), \
|
||||
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION)
|
||||
|
||||
/* reuse advisory lock, but with different, unused field 4 (13)
|
||||
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks
|
||||
* are local to each database */
|
||||
#define SET_LOCKTAG_REBALANCE_PLACEMENT_COLOCATION(tag, colocationOrTableId) \
|
||||
SET_LOCKTAG_ADVISORY(tag, \
|
||||
MyDatabaseId, \
|
||||
(uint32) ((colocationOrTableId) >> 32), \
|
||||
(uint32) (colocationOrTableId), \
|
||||
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION)
|
||||
|
||||
|
||||
/* advisory lock for citus operations, also it has the database hardcoded to MyDatabaseId,
|
||||
* to ensure the locks are local to each database */
|
||||
#define SET_LOCKTAG_CITUS_OPERATION(tag, operationId) \
|
||||
|
|
|
@ -194,6 +194,7 @@ extern List * RebalancePlacementUpdates(List *workerNodeList,
|
|||
extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList,
|
||||
int shardReplicationFactor);
|
||||
extern void ExecuteRebalancerCommandInSeparateTransaction(char *command);
|
||||
|
||||
extern void AcquirePlacementColocationLock(Oid relationId, int lockMode,
|
||||
const char *operationName);
|
||||
|
||||
#endif /* SHARD_REBALANCER_H */
|
||||
|
|
|
@ -0,0 +1,220 @@
|
|||
Parsed test spec with 5 sessions
|
||||
|
||||
starting permutation: s2-begin s2-create_distributed_table s3-create_distributed_table s2-commit
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-create_distributed_table:
|
||||
SELECT create_distributed_table('concurrent_table_2', 'id', colocate_with := 'concurrent_table_1');
|
||||
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s3-create_distributed_table:
|
||||
SELECT create_distributed_table('concurrent_table_3', 'id', colocate_with := 'concurrent_table_1');
|
||||
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
|
||||
starting permutation: s2-begin s2-create_distributed_table s1-move-shard-logical s2-commit s3-sanity-check s3-sanity-check-2
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-create_distributed_table:
|
||||
SELECT create_distributed_table('concurrent_table_2', 'id', colocate_with := 'concurrent_table_1');
|
||||
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-move-shard-logical:
|
||||
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1)
|
||||
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638) FROM shardid;
|
||||
|
||||
ERROR: could not acquire the lock required to move public.concurrent_table_1
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s3-sanity-check:
|
||||
SELECT count(*) FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING(shardid) WHERE nodename IS NULL;
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
step s3-sanity-check-2:
|
||||
SELECT count(*) FROM concurrent_table_1 JOIN concurrent_table_2 USING (id);
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
|
||||
starting permutation: s2-begin s2-create_distributed_table s1-move-shard-block s2-commit s3-sanity-check s3-sanity-check-2
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-create_distributed_table:
|
||||
SELECT create_distributed_table('concurrent_table_2', 'id', colocate_with := 'concurrent_table_1');
|
||||
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-move-shard-block:
|
||||
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1)
|
||||
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638, 'block_writes') FROM shardid;
|
||||
|
||||
ERROR: could not acquire the lock required to move public.concurrent_table_1
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s3-sanity-check:
|
||||
SELECT count(*) FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING(shardid) WHERE nodename IS NULL;
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
step s3-sanity-check-2:
|
||||
SELECT count(*) FROM concurrent_table_1 JOIN concurrent_table_2 USING (id);
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
|
||||
starting permutation: s4-begin s4-move-shard-logical s5-setup-rep-factor s5-create_implicit_colocated_distributed_table s4-commit s3-sanity-check s3-sanity-check-3 s3-sanity-check-4
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s4-begin:
|
||||
BEGIN;
|
||||
|
||||
step s4-move-shard-logical:
|
||||
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_4'::regclass ORDER BY shardid LIMIT 1)
|
||||
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638) FROM shardid;
|
||||
|
||||
citus_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s5-setup-rep-factor:
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
||||
step s5-create_implicit_colocated_distributed_table:
|
||||
SELECT create_distributed_table('concurrent_table_5', 'id');
|
||||
|
||||
ERROR: could not acquire the lock required to colocate distributed table public.concurrent_table_4
|
||||
step s4-commit:
|
||||
commit;
|
||||
|
||||
step s3-sanity-check:
|
||||
SELECT count(*) FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING(shardid) WHERE nodename IS NULL;
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
step s3-sanity-check-3:
|
||||
SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('concurrent_table_4', 'concurrent_table_5');
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
step s3-sanity-check-4:
|
||||
SELECT count(*) FROM concurrent_table_4 JOIN concurrent_table_5 USING (id);
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
|
||||
starting permutation: s4-begin s4-move-shard-block s5-setup-rep-factor s5-create_implicit_colocated_distributed_table s4-commit s3-sanity-check s3-sanity-check-3 s3-sanity-check-4
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s4-begin:
|
||||
BEGIN;
|
||||
|
||||
step s4-move-shard-block:
|
||||
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_4'::regclass ORDER BY shardid LIMIT 1)
|
||||
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638, 'block_writes') FROM shardid;
|
||||
|
||||
citus_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s5-setup-rep-factor:
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
||||
step s5-create_implicit_colocated_distributed_table:
|
||||
SELECT create_distributed_table('concurrent_table_5', 'id');
|
||||
|
||||
ERROR: could not acquire the lock required to colocate distributed table public.concurrent_table_4
|
||||
step s4-commit:
|
||||
commit;
|
||||
|
||||
step s3-sanity-check:
|
||||
SELECT count(*) FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING(shardid) WHERE nodename IS NULL;
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
step s3-sanity-check-3:
|
||||
SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('concurrent_table_4', 'concurrent_table_5');
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
step s3-sanity-check-4:
|
||||
SELECT count(*) FROM concurrent_table_4 JOIN concurrent_table_5 USING (id);
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
|
@ -2,19 +2,19 @@ Parsed test spec with 2 sessions
|
|||
|
||||
starting permutation: s1-load-cache s2-load-cache s2-set-placement-inactive s2-begin s2-repair-placement s1-repair-placement s2-commit
|
||||
step s1-load-cache:
|
||||
COPY test_hash_table FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV;
|
||||
COPY test_hash_table FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV;
|
||||
|
||||
step s2-load-cache:
|
||||
COPY test_hash_table FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV;
|
||||
COPY test_hash_table FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV;
|
||||
|
||||
step s2-set-placement-inactive:
|
||||
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard_for_test_table) AND nodeport = 57638;
|
||||
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard_for_test_table) AND nodeport = 57638;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
BEGIN;
|
||||
|
||||
step s2-repair-placement:
|
||||
SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638);
|
||||
SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638);
|
||||
|
||||
master_copy_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
@ -22,23 +22,22 @@ master_copy_shard_placement
|
|||
(1 row)
|
||||
|
||||
step s1-repair-placement:
|
||||
SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638);
|
||||
<waiting ...>
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638);
|
||||
|
||||
ERROR: could not acquire the lock required to repair public.test_hash_table
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-repair-placement: <... completed>
|
||||
ERROR: target placement must be in inactive state
|
||||
|
||||
starting permutation: s2-set-placement-inactive s2-begin s2-repair-placement s1-repair-placement s2-commit
|
||||
step s2-set-placement-inactive:
|
||||
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard_for_test_table) AND nodeport = 57638;
|
||||
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid IN (SELECT * FROM selected_shard_for_test_table) AND nodeport = 57638;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
BEGIN;
|
||||
|
||||
step s2-repair-placement:
|
||||
SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638);
|
||||
SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638);
|
||||
|
||||
master_copy_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
@ -46,10 +45,9 @@ master_copy_shard_placement
|
|||
(1 row)
|
||||
|
||||
step s1-repair-placement:
|
||||
SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638);
|
||||
<waiting ...>
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638);
|
||||
|
||||
ERROR: could not acquire the lock required to repair public.test_hash_table
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-repair-placement: <... completed>
|
||||
ERROR: target placement must be in inactive state
|
||||
|
|
|
@ -10,10 +10,10 @@ pg_advisory_lock
|
|||
(1 row)
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
BEGIN;
|
||||
|
||||
step s1-move-placement:
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
<waiting ...>
|
||||
step s2-insert:
|
||||
INSERT INTO logical_replicate_placement VALUES (15, 15);
|
||||
|
@ -33,7 +33,7 @@ master_move_shard_placement
|
|||
(1 row)
|
||||
|
||||
step s1-end:
|
||||
COMMIT;
|
||||
COMMIT;
|
||||
|
||||
step s1-select:
|
||||
SELECT * FROM logical_replicate_placement order by y;
|
||||
|
@ -62,10 +62,10 @@ pg_advisory_lock
|
|||
(1 row)
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
BEGIN;
|
||||
|
||||
step s1-move-placement:
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
<waiting ...>
|
||||
step s2-upsert:
|
||||
INSERT INTO logical_replicate_placement VALUES (15, 15);
|
||||
|
@ -86,7 +86,7 @@ master_move_shard_placement
|
|||
(1 row)
|
||||
|
||||
step s1-end:
|
||||
COMMIT;
|
||||
COMMIT;
|
||||
|
||||
step s1-select:
|
||||
SELECT * FROM logical_replicate_placement order by y;
|
||||
|
@ -118,10 +118,10 @@ pg_advisory_lock
|
|||
(1 row)
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
BEGIN;
|
||||
|
||||
step s1-move-placement:
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
<waiting ...>
|
||||
step s2-update:
|
||||
UPDATE logical_replicate_placement SET y = y + 1 WHERE x = 15;
|
||||
|
@ -141,7 +141,7 @@ master_move_shard_placement
|
|||
(1 row)
|
||||
|
||||
step s1-end:
|
||||
COMMIT;
|
||||
COMMIT;
|
||||
|
||||
step s1-select:
|
||||
SELECT * FROM logical_replicate_placement order by y;
|
||||
|
@ -173,10 +173,10 @@ pg_advisory_lock
|
|||
(1 row)
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
BEGIN;
|
||||
|
||||
step s1-move-placement:
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
<waiting ...>
|
||||
step s2-delete:
|
||||
DELETE FROM logical_replicate_placement WHERE x = 15;
|
||||
|
@ -196,7 +196,7 @@ master_move_shard_placement
|
|||
(1 row)
|
||||
|
||||
step s1-end:
|
||||
COMMIT;
|
||||
COMMIT;
|
||||
|
||||
step s1-select:
|
||||
SELECT * FROM logical_replicate_placement order by y;
|
||||
|
@ -227,10 +227,10 @@ pg_advisory_lock
|
|||
(1 row)
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
BEGIN;
|
||||
|
||||
step s1-move-placement:
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
<waiting ...>
|
||||
step s2-select:
|
||||
SELECT * FROM logical_replicate_placement ORDER BY y;
|
||||
|
@ -255,7 +255,7 @@ master_move_shard_placement
|
|||
(1 row)
|
||||
|
||||
step s1-end:
|
||||
COMMIT;
|
||||
COMMIT;
|
||||
|
||||
step s1-get-shard-distribution:
|
||||
select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 AND shardid in (SELECT * FROM selected_shard) order by nodeport;
|
||||
|
@ -279,10 +279,10 @@ pg_advisory_lock
|
|||
(1 row)
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
BEGIN;
|
||||
|
||||
step s1-move-placement:
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
<waiting ...>
|
||||
step s2-select-for-update:
|
||||
SELECT * FROM logical_replicate_placement WHERE x=15 FOR UPDATE;
|
||||
|
@ -307,7 +307,7 @@ master_move_shard_placement
|
|||
(1 row)
|
||||
|
||||
step s1-end:
|
||||
COMMIT;
|
||||
COMMIT;
|
||||
|
||||
step s1-get-shard-distribution:
|
||||
select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 AND shardid in (SELECT * FROM selected_shard) order by nodeport;
|
||||
|
@ -320,7 +320,7 @@ nodeport
|
|||
|
||||
starting permutation: s1-begin s2-begin s2-insert s1-move-placement s2-end s1-end s1-select s1-get-shard-distribution
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
@ -329,10 +329,10 @@ step s2-insert:
|
|||
INSERT INTO logical_replicate_placement VALUES (15, 15);
|
||||
|
||||
step s1-move-placement:
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
<waiting ...>
|
||||
step s2-end:
|
||||
COMMIT;
|
||||
COMMIT;
|
||||
|
||||
step s1-move-placement: <... completed>
|
||||
master_move_shard_placement
|
||||
|
@ -341,7 +341,7 @@ master_move_shard_placement
|
|||
(1 row)
|
||||
|
||||
step s1-end:
|
||||
COMMIT;
|
||||
COMMIT;
|
||||
|
||||
step s1-select:
|
||||
SELECT * FROM logical_replicate_placement order by y;
|
||||
|
@ -362,7 +362,7 @@ nodeport
|
|||
|
||||
starting permutation: s1-begin s2-begin s2-upsert s1-move-placement s2-end s1-end s1-select s1-get-shard-distribution
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
@ -372,10 +372,10 @@ step s2-upsert:
|
|||
INSERT INTO logical_replicate_placement VALUES (15, 15) ON CONFLICT (x) DO UPDATE SET y = logical_replicate_placement.y + 1;
|
||||
|
||||
step s1-move-placement:
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
<waiting ...>
|
||||
step s2-end:
|
||||
COMMIT;
|
||||
COMMIT;
|
||||
|
||||
step s1-move-placement: <... completed>
|
||||
master_move_shard_placement
|
||||
|
@ -384,7 +384,7 @@ master_move_shard_placement
|
|||
(1 row)
|
||||
|
||||
step s1-end:
|
||||
COMMIT;
|
||||
COMMIT;
|
||||
|
||||
step s1-select:
|
||||
SELECT * FROM logical_replicate_placement order by y;
|
||||
|
@ -408,7 +408,7 @@ step s1-insert:
|
|||
INSERT INTO logical_replicate_placement VALUES (15, 15);
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
@ -417,10 +417,10 @@ step s2-update:
|
|||
UPDATE logical_replicate_placement SET y = y + 1 WHERE x = 15;
|
||||
|
||||
step s1-move-placement:
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
<waiting ...>
|
||||
step s2-end:
|
||||
COMMIT;
|
||||
COMMIT;
|
||||
|
||||
step s1-move-placement: <... completed>
|
||||
master_move_shard_placement
|
||||
|
@ -429,7 +429,7 @@ master_move_shard_placement
|
|||
(1 row)
|
||||
|
||||
step s1-end:
|
||||
COMMIT;
|
||||
COMMIT;
|
||||
|
||||
step s1-select:
|
||||
SELECT * FROM logical_replicate_placement order by y;
|
||||
|
@ -453,7 +453,7 @@ step s1-insert:
|
|||
INSERT INTO logical_replicate_placement VALUES (15, 15);
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
@ -462,10 +462,10 @@ step s2-delete:
|
|||
DELETE FROM logical_replicate_placement WHERE x = 15;
|
||||
|
||||
step s1-move-placement:
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
<waiting ...>
|
||||
step s2-end:
|
||||
COMMIT;
|
||||
COMMIT;
|
||||
|
||||
step s1-move-placement: <... completed>
|
||||
master_move_shard_placement
|
||||
|
@ -474,7 +474,7 @@ master_move_shard_placement
|
|||
(1 row)
|
||||
|
||||
step s1-end:
|
||||
COMMIT;
|
||||
COMMIT;
|
||||
|
||||
step s1-select:
|
||||
SELECT * FROM logical_replicate_placement order by y;
|
||||
|
@ -497,7 +497,7 @@ step s1-insert:
|
|||
INSERT INTO logical_replicate_placement VALUES (15, 15);
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
@ -511,7 +511,7 @@ step s2-select:
|
|||
(1 row)
|
||||
|
||||
step s1-move-placement:
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
@ -519,10 +519,10 @@ master_move_shard_placement
|
|||
(1 row)
|
||||
|
||||
step s2-end:
|
||||
COMMIT;
|
||||
COMMIT;
|
||||
|
||||
step s1-end:
|
||||
COMMIT;
|
||||
COMMIT;
|
||||
|
||||
step s1-get-shard-distribution:
|
||||
select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 AND shardid in (SELECT * FROM selected_shard) order by nodeport;
|
||||
|
@ -538,7 +538,7 @@ step s1-insert:
|
|||
INSERT INTO logical_replicate_placement VALUES (15, 15);
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
@ -552,10 +552,10 @@ step s2-select-for-update:
|
|||
(1 row)
|
||||
|
||||
step s1-move-placement:
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
<waiting ...>
|
||||
step s2-end:
|
||||
COMMIT;
|
||||
COMMIT;
|
||||
|
||||
step s1-move-placement: <... completed>
|
||||
master_move_shard_placement
|
||||
|
@ -564,7 +564,7 @@ master_move_shard_placement
|
|||
(1 row)
|
||||
|
||||
step s1-end:
|
||||
COMMIT;
|
||||
COMMIT;
|
||||
|
||||
step s1-get-shard-distribution:
|
||||
select nodeport from pg_dist_placement inner join pg_dist_node on(pg_dist_placement.groupid = pg_dist_node.groupid) where shardstate != 4 AND shardid in (SELECT * FROM selected_shard) order by nodeport;
|
||||
|
@ -577,13 +577,13 @@ nodeport
|
|||
|
||||
starting permutation: s1-begin s2-begin s1-move-placement s2-move-placement s1-end s2-end
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-move-placement:
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
@ -591,19 +591,14 @@ master_move_shard_placement
|
|||
(1 row)
|
||||
|
||||
step s2-move-placement:
|
||||
SELECT master_move_shard_placement(
|
||||
get_shard_id_for_distribution_column('logical_replicate_placement', 4),
|
||||
'localhost', 57637, 'localhost', 57638);
|
||||
<waiting ...>
|
||||
step s1-end:
|
||||
COMMIT;
|
||||
SELECT master_move_shard_placement(
|
||||
get_shard_id_for_distribution_column('logical_replicate_placement', 4),
|
||||
'localhost', 57637, 'localhost', 57638);
|
||||
|
||||
step s2-move-placement: <... completed>
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
ERROR: could not acquire the lock required to move public.logical_replicate_placement
|
||||
step s1-end:
|
||||
COMMIT;
|
||||
|
||||
step s2-end:
|
||||
COMMIT;
|
||||
COMMIT;
|
||||
|
||||
|
|
|
@ -8,7 +8,7 @@ run_command_on_workers
|
|||
(2 rows)
|
||||
|
||||
step s1-grant:
|
||||
SELECT result FROM run_command_on_placements('my_table', 'GRANT SELECT ON TABLE %s TO my_user');
|
||||
SELECT result FROM run_command_on_placements('my_table', 'GRANT SELECT ON TABLE %s TO my_user');
|
||||
|
||||
result
|
||||
---------------------------------------------------------------------
|
||||
|
@ -19,7 +19,7 @@ GRANT
|
|||
(4 rows)
|
||||
|
||||
step s1-connect:
|
||||
SELECT make_external_connection_to_node('localhost', 57637, 'my_user', current_database());
|
||||
SELECT make_external_connection_to_node('localhost', 57637, 'my_user', current_database());
|
||||
|
||||
make_external_connection_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
@ -27,11 +27,11 @@ make_external_connection_to_node
|
|||
(1 row)
|
||||
|
||||
step s2-connect:
|
||||
SELECT make_external_connection_to_node('localhost', 57637, 'my_user', current_database());
|
||||
SELECT make_external_connection_to_node('localhost', 57637, 'my_user', current_database());
|
||||
|
||||
ERROR: connection failed
|
||||
step s2-connect-superuser:
|
||||
SELECT make_external_connection_to_node('localhost', 57637, 'postgres', current_database());
|
||||
SELECT make_external_connection_to_node('localhost', 57637, 'postgres', current_database());
|
||||
|
||||
make_external_connection_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
@ -39,17 +39,11 @@ make_external_connection_to_node
|
|||
(1 row)
|
||||
|
||||
step s3-select:
|
||||
SET ROLE my_user;
|
||||
SELECT count(*) FROM my_table;
|
||||
SET ROLE my_user;
|
||||
SELECT count(*) FROM my_table;
|
||||
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,t)
|
||||
(localhost,57638,t,t)
|
||||
(2 rows)
|
||||
|
||||
|
|
|
@ -2,13 +2,13 @@ Parsed test spec with 2 sessions
|
|||
|
||||
starting permutation: s1-load-cache s2-begin s2-move-placement s1-move-placement s2-commit s2-print-placements
|
||||
step s1-load-cache:
|
||||
COPY test_move_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
|
||||
COPY test_move_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
BEGIN;
|
||||
|
||||
step s2-move-placement:
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638, 'force_logical');
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638, 'force_logical');
|
||||
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
@ -16,24 +16,23 @@ master_move_shard_placement
|
|||
(1 row)
|
||||
|
||||
step s1-move-placement:
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638, 'force_logical');
|
||||
<waiting ...>
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638, 'force_logical');
|
||||
|
||||
ERROR: could not acquire the lock required to move public.test_move_table
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-move-placement: <... completed>
|
||||
ERROR: source placement must be in active state
|
||||
step s2-print-placements:
|
||||
SELECT
|
||||
nodename, nodeport, count(*)
|
||||
FROM
|
||||
pg_dist_shard_placement
|
||||
WHERE
|
||||
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_move_table'::regclass)
|
||||
AND
|
||||
shardstate = 1
|
||||
GROUP BY
|
||||
nodename, nodeport;
|
||||
SELECT
|
||||
nodename, nodeport, count(*)
|
||||
FROM
|
||||
pg_dist_shard_placement
|
||||
WHERE
|
||||
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_move_table'::regclass)
|
||||
AND
|
||||
shardstate = 1
|
||||
GROUP BY
|
||||
nodename, nodeport;
|
||||
|
||||
nodename |nodeport|count
|
||||
---------------------------------------------------------------------
|
||||
|
@ -43,10 +42,10 @@ localhost| 57638| 2
|
|||
|
||||
starting permutation: s2-begin s2-move-placement s1-move-placement s2-commit s2-print-placements
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
BEGIN;
|
||||
|
||||
step s2-move-placement:
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638, 'force_logical');
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638, 'force_logical');
|
||||
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
@ -54,24 +53,23 @@ master_move_shard_placement
|
|||
(1 row)
|
||||
|
||||
step s1-move-placement:
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638, 'force_logical');
|
||||
<waiting ...>
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638, 'force_logical');
|
||||
|
||||
ERROR: could not acquire the lock required to move public.test_move_table
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-move-placement: <... completed>
|
||||
ERROR: source placement must be in active state
|
||||
step s2-print-placements:
|
||||
SELECT
|
||||
nodename, nodeport, count(*)
|
||||
FROM
|
||||
pg_dist_shard_placement
|
||||
WHERE
|
||||
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_move_table'::regclass)
|
||||
AND
|
||||
shardstate = 1
|
||||
GROUP BY
|
||||
nodename, nodeport;
|
||||
SELECT
|
||||
nodename, nodeport, count(*)
|
||||
FROM
|
||||
pg_dist_shard_placement
|
||||
WHERE
|
||||
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_move_table'::regclass)
|
||||
AND
|
||||
shardstate = 1
|
||||
GROUP BY
|
||||
nodename, nodeport;
|
||||
|
||||
nodename |nodeport|count
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -95,5 +95,6 @@ test: isolation_replicated_dist_on_mx
|
|||
test: isolation_replicate_reference_tables_to_coordinator
|
||||
test: isolation_multiuser_locking
|
||||
test: isolation_acquire_distributed_locks
|
||||
test: isolation_concurrent_move_create_table
|
||||
|
||||
test: isolation_check_mx
|
||||
|
|
|
@ -0,0 +1,126 @@
|
|||
setup
|
||||
{
|
||||
CREATE TABLE concurrent_table_1(id int PRIMARY KEY);
|
||||
CREATE TABLE concurrent_table_2(id int PRIMARY KEY);
|
||||
CREATE TABLE concurrent_table_3(id int PRIMARY KEY);
|
||||
CREATE TABLE concurrent_table_4(id int PRIMARY KEY);
|
||||
CREATE TABLE concurrent_table_5(id int PRIMARY KEY);
|
||||
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SELECT create_distributed_table('concurrent_table_1', 'id', colocate_with := 'none');
|
||||
SELECT create_distributed_table('concurrent_table_4', 'id');
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP TABLE concurrent_table_1, concurrent_table_2, concurrent_table_3, concurrent_table_4, concurrent_table_5 CASCADE;
|
||||
}
|
||||
|
||||
session "s1"
|
||||
|
||||
|
||||
step "s1-move-shard-logical"
|
||||
{
|
||||
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1)
|
||||
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638) FROM shardid;
|
||||
}
|
||||
|
||||
step "s1-move-shard-block"
|
||||
{
|
||||
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_1'::regclass ORDER BY shardid LIMIT 1)
|
||||
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638, 'block_writes') FROM shardid;
|
||||
}
|
||||
|
||||
session "s2"
|
||||
|
||||
step "s2-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "s2-create_distributed_table"
|
||||
{
|
||||
SELECT create_distributed_table('concurrent_table_2', 'id', colocate_with := 'concurrent_table_1');
|
||||
}
|
||||
|
||||
step "s2-commit"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
session "s3"
|
||||
|
||||
step "s3-create_distributed_table"
|
||||
{
|
||||
SELECT create_distributed_table('concurrent_table_3', 'id', colocate_with := 'concurrent_table_1');
|
||||
}
|
||||
|
||||
step "s3-sanity-check"
|
||||
{
|
||||
SELECT count(*) FROM pg_dist_shard LEFT JOIN pg_dist_shard_placement USING(shardid) WHERE nodename IS NULL;
|
||||
}
|
||||
|
||||
step "s3-sanity-check-2"
|
||||
{
|
||||
SELECT count(*) FROM concurrent_table_1 JOIN concurrent_table_2 USING (id);
|
||||
}
|
||||
|
||||
step "s3-sanity-check-3"
|
||||
{
|
||||
SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('concurrent_table_4', 'concurrent_table_5');
|
||||
}
|
||||
|
||||
step "s3-sanity-check-4"
|
||||
{
|
||||
SELECT count(*) FROM concurrent_table_4 JOIN concurrent_table_5 USING (id);
|
||||
}
|
||||
|
||||
|
||||
session "s4"
|
||||
|
||||
step "s4-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "s4-commit"
|
||||
{
|
||||
commit;
|
||||
}
|
||||
|
||||
step "s4-move-shard-logical"
|
||||
{
|
||||
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_4'::regclass ORDER BY shardid LIMIT 1)
|
||||
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638) FROM shardid;
|
||||
}
|
||||
|
||||
step "s4-move-shard-block"
|
||||
{
|
||||
WITH shardid AS (SELECT shardid FROM pg_dist_shard where logicalrelid = 'concurrent_table_4'::regclass ORDER BY shardid LIMIT 1)
|
||||
SELECT citus_move_Shard_placement(shardid.shardid, 'localhost', 57637, 'localhost', 57638, 'block_writes') FROM shardid;
|
||||
}
|
||||
|
||||
session "s5"
|
||||
|
||||
step "s5-setup-rep-factor"
|
||||
{
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
}
|
||||
|
||||
step "s5-create_implicit_colocated_distributed_table"
|
||||
{
|
||||
SELECT create_distributed_table('concurrent_table_5', 'id');
|
||||
}
|
||||
|
||||
|
||||
//concurrent create_distributed_table with the same colocation should not block each other
|
||||
permutation "s2-begin" "s2-create_distributed_table" "s3-create_distributed_table" "s2-commit"
|
||||
|
||||
// concurrent create colocated table and shard move properly block each other, and cluster is healthy
|
||||
permutation "s2-begin" "s2-create_distributed_table" "s1-move-shard-logical" "s2-commit" "s3-sanity-check" "s3-sanity-check-2"
|
||||
permutation "s2-begin" "s2-create_distributed_table" "s1-move-shard-block" "s2-commit" "s3-sanity-check" "s3-sanity-check-2"
|
||||
|
||||
// same test above, but this time implicitly colocated tables
|
||||
permutation "s4-begin" "s4-move-shard-logical" "s5-setup-rep-factor" "s5-create_implicit_colocated_distributed_table" "s4-commit" "s3-sanity-check" "s3-sanity-check-3" "s3-sanity-check-4"
|
||||
permutation "s4-begin" "s4-move-shard-block" "s5-setup-rep-factor" "s5-create_implicit_colocated_distributed_table" "s4-commit" "s3-sanity-check" "s3-sanity-check-3" "s3-sanity-check-4"
|
||||
|
|
@ -26,6 +26,7 @@ teardown
|
|||
{
|
||||
SELECT run_command_on_workers('ALTER SYSTEM RESET citus.max_client_connections');
|
||||
SELECT run_command_on_workers('SELECT pg_reload_conf()');
|
||||
DROP TABLE my_table;
|
||||
}
|
||||
|
||||
session "s1"
|
||||
|
|
Loading…
Reference in New Issue