diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 3b2832641..fd7c4d57e 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -3951,6 +3951,13 @@ citus_internal_shard_group_set_needsisolatednode(PG_FUNCTION_ARGS) PG_ENSURE_ARGNOTNULL(1, "enabled"); bool enabled = PG_GETARG_BOOL(1); + /* only owner of the table (or superuser) is allowed to modify the Citus metadata */ + Oid distributedRelationId = RelationIdForShard(shardId); + EnsureTableOwner(distributedRelationId); + + /* we want to serialize all the metadata changes to this table */ + LockRelationOid(distributedRelationId, ShareUpdateExclusiveLock); + if (!ShouldSkipMetadataChecks()) { EnsureCoordinatorInitiatedOperation(); diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 3c8219293..5f48bbd4a 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -63,6 +63,7 @@ #include "distributed/resource_lock.h" #include "distributed/remote_commands.h" #include "distributed/shard_rebalancer.h" +#include "distributed/shard_transfer.h" #include "distributed/tuplestore.h" #include "distributed/utils/array_type.h" #include "distributed/worker_manager.h" @@ -114,7 +115,7 @@ static void AppendShardIdNameValues(StringInfo selectQuery, ShardInterval *shard static HeapTuple CreateDiskSpaceTuple(TupleDesc tupleDesc, uint64 availableBytes, uint64 totalBytes); static bool GetLocalDiskSpaceStats(uint64 *availableBytes, uint64 *totalBytes); -static void ErrorIfShardIsolationNotPossible(uint64 shardId); +static void citus_shard_property_set_anti_affinity(uint64 shardId, bool enabled); static void ShardGroupSetNeedsIsolatedNodeGlobally(uint64 shardId, bool enabled); static void ShardSetNeedsIsolatedNode(uint64 shardId, bool enabled); static BackgroundTask * DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, @@ -372,12 +373,28 @@ citus_shard_property_set(PG_FUNCTION_ARGS) PG_ENSURE_ARGNOTNULL(0, "shard_id"); uint64 shardId = PG_GETARG_INT64(0); + if (!ShardExists(shardId)) + { + ereport(ERROR, (errmsg("shard %lu does not exist", shardId))); + } + + Oid distributedRelationId = RelationIdForShard(shardId); + List *colocatedTableList = ColocatedTableList(distributedRelationId); + EnsureTableListOwner(colocatedTableList); + + AcquirePlacementColocationLock(distributedRelationId, ExclusiveLock, + "set anti affinity property for a shard of"); + + Oid colocatedTableId = InvalidOid; + foreach_oid(colocatedTableId, colocatedTableList) + { + LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock); + } + if (!PG_ARGISNULL(1)) { - ErrorIfShardIsolationNotPossible(shardId); - - bool enabled = PG_GETARG_BOOL(1); - ShardGroupSetNeedsIsolatedNodeGlobally(shardId, enabled); + bool antiAffinity = PG_GETARG_BOOL(1); + citus_shard_property_set_anti_affinity(shardId, antiAffinity); } PG_RETURN_VOID(); @@ -385,17 +402,13 @@ citus_shard_property_set(PG_FUNCTION_ARGS) /* - * ErrorIfShardIsolationNotPossible throws an error if shard isolation is not - * possible for the given shard. + * citus_shard_property_set_anti_affinity is an helper function for + * citus_shard_property_set UDF to set anti_affinity property for given + * shard. */ static void -ErrorIfShardIsolationNotPossible(uint64 shardId) +citus_shard_property_set_anti_affinity(uint64 shardId, bool enabled) { - if (!ShardExists(shardId)) - { - ereport(ERROR, (errmsg("shard %lu does not exist", shardId))); - } - Oid distributedRelationId = RelationIdForShard(shardId); if (!IsCitusTableType(distributedRelationId, HASH_DISTRIBUTED) && !IsCitusTableType(distributedRelationId, SINGLE_SHARD_DISTRIBUTED)) @@ -403,6 +416,8 @@ ErrorIfShardIsolationNotPossible(uint64 shardId) ereport(ERROR, (errmsg("shard isolation is only supported for hash " "distributed tables"))); } + + ShardGroupSetNeedsIsolatedNodeGlobally(shardId, enabled); } diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index 23925a315..d8efa8b8a 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -120,7 +120,6 @@ static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNodePort, const char *targetNodeName, int32 targetNodePort); static List * RecreateTableDDLCommandList(Oid relationId); -static void EnsureTableListOwner(List *tableIdList); static void ErrorIfReplicatingDistributedTableWithFKeys(List *tableIdList); static void DropShardPlacementsFromMetadata(List *shardList, @@ -152,7 +151,6 @@ static bool TransferAlreadyCompleted(List *colocatedShardList, char *sourceNodeName, uint32 sourceNodePort, char *targetNodeName, uint32 targetNodePort, ShardTransferType transferType); -static void LockColocatedRelationsForMove(List *colocatedTableList); static void ErrorIfForeignTableForShardTransfer(List *colocatedTableList, ShardTransferType transferType); static List * RecreateShardDDLCommandList(ShardInterval *shardInterval, @@ -667,7 +665,7 @@ IsShardListOnNode(List *colocatedShardList, char *targetNodeName, uint32 targetN * LockColocatedRelationsForMove takes a list of relations, locks all of them * using ShareUpdateExclusiveLock */ -static void +void LockColocatedRelationsForMove(List *colocatedTableList) { Oid colocatedTableId = InvalidOid; @@ -1275,7 +1273,7 @@ LookupShardTransferMode(Oid shardReplicationModeOid) * EnsureTableListOwner ensures current user owns given tables. Superusers * are regarded as owners. */ -static void +void EnsureTableListOwner(List *tableIdList) { Oid tableId = InvalidOid; diff --git a/src/include/distributed/shard_transfer.h b/src/include/distributed/shard_transfer.h index a6d024a2e..5d3f46674 100644 --- a/src/include/distributed/shard_transfer.h +++ b/src/include/distributed/shard_transfer.h @@ -29,6 +29,7 @@ extern void TransferShards(int64 shardId, extern uint64 ShardListSizeInBytes(List *colocatedShardList, char *workerNodeName, uint32 workerNodePort); extern void ErrorIfMoveUnsupportedTableType(Oid relationId); +extern void EnsureTableListOwner(List *tableIdList); extern void CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardIntervalList, char *snapshotName); extern void VerifyTablesHaveReplicaIdentity(List *colocatedTableList); @@ -40,3 +41,4 @@ extern void UpdatePlacementUpdateStatusForShardIntervalList(List *shardIntervalL extern void InsertDeferredDropCleanupRecordsForShards(List *shardIntervalList); extern void InsertCleanupRecordsForShardPlacementsOnNode(List *shardIntervalList, int32 groupId); +extern void LockColocatedRelationsForMove(List *colocatedTableList); diff --git a/src/test/regress/expected/isolate_placement.out b/src/test/regress/expected/isolate_placement.out index 4c86e2455..813d2aea6 100644 --- a/src/test/regress/expected/isolate_placement.out +++ b/src/test/regress/expected/isolate_placement.out @@ -14,9 +14,19 @@ SELECT citus_internal_shard_group_set_needsisolatednode(0, NULL); ERROR: enabled cannot be NULL SELECT citus_internal_shard_group_set_needsisolatednode(NULL, false); ERROR: shard_id cannot be NULL +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 2000000; +CREATE TABLE single_shard_1(a int); +SELECT create_distributed_table('single_shard_1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + -- test with user that doesn't have permission to execute the function -SELECT citus_internal_shard_group_set_needsisolatednode(0, true); +SELECT citus_internal_shard_group_set_needsisolatednode(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; ERROR: This is an internal Citus function can only be used in a distributed transaction +DROP TABLE single_shard_1; CREATE ROLE test_user_isolate_placement WITH LOGIN; GRANT ALL ON SCHEMA isolate_placement TO test_user_isolate_placement; ALTER SYSTEM SET citus.enable_manual_metadata_changes_for_user TO 'test_user_isolate_placement'; @@ -62,7 +72,110 @@ SELECT pg_sleep(0.1); (1 row) SET search_path TO isolate_placement; -SET citus.next_shard_id TO 2000000; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 2001000; +CREATE USER mysuperuser superuser; +SET ROLE mysuperuser; +CREATE TABLE single_shard_1(a int); +SELECT create_distributed_table('single_shard_1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE USER regularuser; +GRANT USAGE ON SCHEMA isolate_placement TO regularuser; +ALTER SYSTEM SET citus.enable_manual_metadata_changes_for_user TO 'regularuser'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SET ROLE regularuser; +-- throws an error as the user is not the owner of the table +SELECT citus_shard_property_set(shardid) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; +ERROR: must be owner of table single_shard_1 +SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; +ERROR: must be owner of table single_shard_1 +SELECT citus_internal_shard_group_set_needsisolatednode(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; +ERROR: must be owner of table single_shard_1 +-- assign all tables to regularuser +RESET ROLE; +SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY mysuperuser TO regularuser; $$); + result +--------------------------------------------------------------------- + REASSIGN OWNED + REASSIGN OWNED + REASSIGN OWNED +(3 rows) + +SET ROLE regularuser; +SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; + citus_shard_property_set +--------------------------------------------------------------------- + +(1 row) + +SELECT result FROM run_command_on_all_nodes($$ + SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.single_shard_1') +$$) +ORDER BY result; + result +--------------------------------------------------------------------- + [{"1": [{"isolate_placement.single_shard_1": true}]}] + [{"1": [{"isolate_placement.single_shard_1": true}]}] + [{"1": [{"isolate_placement.single_shard_1": true}]}] +(3 rows) + +SELECT citus_internal_shard_group_set_needsisolatednode(shardid, false) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; + citus_internal_shard_group_set_needsisolatednode +--------------------------------------------------------------------- + +(1 row) + +SELECT result FROM run_command_on_all_nodes($$ + SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.single_shard_1') +$$) +ORDER BY result; + result +--------------------------------------------------------------------- + [{"1": [{"isolate_placement.single_shard_1": true}]}] + [{"1": [{"isolate_placement.single_shard_1": true}]}] + {} +(3 rows) + +SELECT citus_internal_shard_group_set_needsisolatednode(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; + citus_internal_shard_group_set_needsisolatednode +--------------------------------------------------------------------- + +(1 row) + +DROP TABLE single_shard_1; +RESET ROLE; +REVOKE USAGE ON SCHEMA isolate_placement FROM regularuser; +ALTER SYSTEM RESET citus.enable_manual_metadata_changes_for_user; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +DROP ROLE regularuser, mysuperuser; +SET search_path TO isolate_placement; +SET citus.next_shard_id TO 2002000; SET citus.shard_count TO 32; SET citus.shard_replication_factor TO 1; SET client_min_messages TO WARNING; @@ -100,7 +213,7 @@ SET citus.shard_replication_factor TO 1; SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- {} @@ -129,7 +242,7 @@ SELECT citus_shard_property_set(:shardgroup_10_shardid, anti_affinity=>true); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- [{"5": [{"isolate_placement.dist_1": true}, {"isolate_placement.dist_2": true}, {"isolate_placement.dist_3": true}]}, {"10": [{"isolate_placement.dist_1": true}, {"isolate_placement.dist_2": true}, {"isolate_placement.dist_3": true}]}] @@ -149,7 +262,7 @@ SELECT citus_shard_property_set(:shardgroup_3_shardid, anti_affinity=>false); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- [{"5": [{"isolate_placement.dist_1": true}, {"isolate_placement.dist_2": true}, {"isolate_placement.dist_3": true}]}, {"10": [{"isolate_placement.dist_1": true}, {"isolate_placement.dist_2": true}, {"isolate_placement.dist_3": true}]}] @@ -169,7 +282,7 @@ SELECT citus_shard_property_set(:shardgroup_10_shardid, anti_affinity=>false); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- [{"5": [{"isolate_placement.dist_1": true}, {"isolate_placement.dist_2": true}, {"isolate_placement.dist_3": true}]}] @@ -189,7 +302,7 @@ SELECT citus_shard_property_set(:shardgroup_5_shardid, anti_affinity=>true); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- [{"5": [{"isolate_placement.dist_1": true}, {"isolate_placement.dist_2": true}, {"isolate_placement.dist_3": true}]}] @@ -245,7 +358,7 @@ SELECT citus_shard_property_set(:shardgroup_5_shardid, anti_affinity=>true); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- [{"5": [{"isolate_placement.dist_1": true}, {"isolate_placement.dist_2": true}, {"isolate_placement.dist_3": true}]}] @@ -274,7 +387,7 @@ DETAIL: UPDATE and DELETE commands on the relation will error out during create SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- [{"5": [{"isolate_placement.dist_1": true}, {"isolate_placement.dist_2": true}, {"isolate_placement.dist_3": true}, {"isolate_placement.dist_4": true}, {"isolate_placement.dist_4_concurrently": true}]}] @@ -355,7 +468,7 @@ FROM get_candidate_node_for_shard_transfer(:shardgroup_15_shardid); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- [{"5": [{"isolate_placement.dist_1": true}, {"isolate_placement.dist_2": true}, {"isolate_placement.dist_3": true}]}] @@ -390,7 +503,7 @@ SELECT citus_shard_property_set(:shardgroup_3_shardid, anti_affinity=>true); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- [{"3": [{"isolate_placement.dist_1": true}, {"isolate_placement.dist_2": true}]}] @@ -411,7 +524,7 @@ SET client_min_messages TO NOTICE; SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- [{"3": [{"isolate_placement.dist_1": true}, {"isolate_placement.dist_2": true}]}] @@ -446,7 +559,7 @@ SELECT citus_shard_property_set(:shardgroup_9_shardid, anti_affinity=>true); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- [{"9": [{"isolate_placement.dist_1": true}, {"isolate_placement.dist_2": true}]}] @@ -473,7 +586,7 @@ WHERE shardid = :shardgroup_9_shardid; SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- {} @@ -493,7 +606,7 @@ SELECT citus_shard_property_set(:shardgroup_12_shardid, anti_affinity=>true); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- [{"12": [{"isolate_placement.dist_1": true}, {"isolate_placement.dist_2": true}]}] @@ -521,7 +634,7 @@ WHERE shardid = :shardgroup_10_shardid; SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- [{"13": [{"isolate_placement.dist_1": true}, {"isolate_placement.dist_2": true}]}] @@ -548,7 +661,7 @@ SELECT citus_shard_property_set(:shardgroup_17_shardid, anti_affinity=>true); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_3') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- [{"17": [{"isolate_placement.dist_3": true}]}] @@ -574,7 +687,7 @@ SELECT 1 FROM isolate_tenant_to_new_shard('dist_3', 100, shard_transfer_mode => SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_3') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- {} @@ -594,7 +707,7 @@ SELECT citus_shard_property_set(:shardgroup_18_shardid, anti_affinity=>true); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_3') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- [{"18": [{"isolate_placement.dist_3": true}]}] @@ -620,7 +733,7 @@ SELECT 1 FROM isolate_tenant_to_new_shard('dist_3', 1000, shard_transfer_mode => SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_3') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- [{"20": [{"isolate_placement.dist_3": true}]}] @@ -667,7 +780,7 @@ SELECT create_distributed_table('single_shard_2', null, colocate_with=>'single_s SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.single_shard_1') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- [{"1": [{"isolate_placement.single_shard_1": true}, {"isolate_placement.single_shard_2": true}]}] @@ -684,6 +797,9 @@ SELECT citus_shard_property_set(NULL, anti_affinity=>false); ERROR: shard_id cannot be NULL SELECT citus_shard_property_set(0, anti_affinity=>false); ERROR: shard xxxxx does not exist +-- we verify whether shard exists even if anti_affinity is not provided +SELECT citus_shard_property_set(0, anti_affinity=>NULL); +ERROR: shard xxxxx does not exist CREATE TABLE append_table (a int, b int); SELECT create_distributed_table('append_table', 'a', 'append'); create_distributed_table @@ -934,7 +1050,7 @@ CALL public.create_range_partitioned_shards('range_table_post', '{"0","25"}','{" SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; result --------------------------------------------------------------------- [{"1": [{"isolate_placement.dist_1": true}, {"isolate_placement.dist_2": true}, {"isolate_placement.dist_post_colocated": true}, {"isolate_placement.dist_post_concurrently_colocated": true}]}] diff --git a/src/test/regress/expected/isolation_create_distributed_table.out b/src/test/regress/expected/isolation_create_distributed_table.out index a44d05efe..714b6403f 100644 --- a/src/test/regress/expected/isolation_create_distributed_table.out +++ b/src/test/regress/expected/isolation_create_distributed_table.out @@ -108,3 +108,75 @@ ERROR: table "table_to_distribute" is already distributed step s2-commit: COMMIT; + +starting permutation: s1-create_distributed_table s1-begin s2-begin s1_set-shard-property s2-create_distributed_table_colocated s1-rollback s2-commit +step s1-create_distributed_table: + SELECT create_distributed_table('table_to_distribute', 'id'); + +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1_set-shard-property: + SELECT citus_shard_property_set(shardid, anti_affinity=>'true') + FROM pg_dist_shard WHERE logicalrelid = 'table_to_distribute'::regclass + ORDER BY shardid LIMIT 1; + +citus_shard_property_set +--------------------------------------------------------------------- + +(1 row) + +step s2-create_distributed_table_colocated: + SELECT create_distributed_table('table_to_colocate', 'id', colocate_with=>'table_to_distribute'); + +ERROR: could not acquire the lock required to colocate distributed table public.table_to_distribute +step s1-rollback: + ROLLBACK; + +step s2-commit: + COMMIT; + + +starting permutation: s1-create_distributed_table s1-begin s2-begin s2-create_distributed_table_colocated s1_set-shard-property s1-rollback s2-commit +step s1-create_distributed_table: + SELECT create_distributed_table('table_to_distribute', 'id'); + +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s2-create_distributed_table_colocated: + SELECT create_distributed_table('table_to_colocate', 'id', colocate_with=>'table_to_distribute'); + +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1_set-shard-property: + SELECT citus_shard_property_set(shardid, anti_affinity=>'true') + FROM pg_dist_shard WHERE logicalrelid = 'table_to_distribute'::regclass + ORDER BY shardid LIMIT 1; + +ERROR: could not acquire the lock required to set anti affinity property for a shard of public.table_to_distribute +step s1-rollback: + ROLLBACK; + +step s2-commit: + COMMIT; + diff --git a/src/test/regress/expected/isolation_create_distributed_table_concurrently.out b/src/test/regress/expected/isolation_create_distributed_table_concurrently.out index edc232518..f6f2702de 100644 --- a/src/test/regress/expected/isolation_create_distributed_table_concurrently.out +++ b/src/test/regress/expected/isolation_create_distributed_table_concurrently.out @@ -59,10 +59,10 @@ step s2-print-status: logicalrelid|shardid|shardstorage|shardminvalue|shardmaxvalue|needsisolatednode --------------------------------------------------------------------- -table_1 |1400294|t | -2147483648| -1073741825|f -table_1 |1400295|t | -1073741824| -1|f -table_1 |1400296|t | 0| 1073741823|f -table_1 |1400297|t | 1073741824| 2147483647|f +table_1 |1400306|t | -2147483648| -1073741825|f +table_1 |1400307|t | -1073741824| -1|f +table_1 |1400308|t | 0| 1073741823|f +table_1 |1400309|t | 1073741824| 2147483647|f (4 rows) count @@ -127,10 +127,10 @@ step s2-print-status: logicalrelid|shardid|shardstorage|shardminvalue|shardmaxvalue|needsisolatednode --------------------------------------------------------------------- -table_1 |1400299|t | -2147483648| -1073741825|f -table_1 |1400300|t | -1073741824| -1|f -table_1 |1400301|t | 0| 1073741823|f -table_1 |1400302|t | 1073741824| 2147483647|f +table_1 |1400311|t | -2147483648| -1073741825|f +table_1 |1400312|t | -1073741824| -1|f +table_1 |1400313|t | 0| 1073741823|f +table_1 |1400314|t | 1073741824| 2147483647|f (4 rows) count @@ -195,10 +195,10 @@ step s2-print-status: logicalrelid|shardid|shardstorage|shardminvalue|shardmaxvalue|needsisolatednode --------------------------------------------------------------------- -table_1 |1400304|t | -2147483648| -1073741825|f -table_1 |1400305|t | -1073741824| -1|f -table_1 |1400306|t | 0| 1073741823|f -table_1 |1400307|t | 1073741824| 2147483647|f +table_1 |1400316|t | -2147483648| -1073741825|f +table_1 |1400317|t | -1073741824| -1|f +table_1 |1400318|t | 0| 1073741823|f +table_1 |1400319|t | 1073741824| 2147483647|f (4 rows) count @@ -263,10 +263,10 @@ step s2-print-status: logicalrelid|shardid|shardstorage|shardminvalue|shardmaxvalue|needsisolatednode --------------------------------------------------------------------- -table_1 |1400309|t | -2147483648| -1073741825|f -table_1 |1400310|t | -1073741824| -1|f -table_1 |1400311|t | 0| 1073741823|f -table_1 |1400312|t | 1073741824| 2147483647|f +table_1 |1400321|t | -2147483648| -1073741825|f +table_1 |1400322|t | -1073741824| -1|f +table_1 |1400323|t | 0| 1073741823|f +table_1 |1400324|t | 1073741824| 2147483647|f (4 rows) count diff --git a/src/test/regress/spec/isolation_create_distributed_table.spec b/src/test/regress/spec/isolation_create_distributed_table.spec index 0934df358..e86a22104 100644 --- a/src/test/regress/spec/isolation_create_distributed_table.spec +++ b/src/test/regress/spec/isolation_create_distributed_table.spec @@ -1,11 +1,13 @@ setup { CREATE TABLE table_to_distribute(id int); + CREATE TABLE table_to_colocate(id int); } teardown { DROP TABLE table_to_distribute CASCADE; + DROP TABLE table_to_colocate CASCADE; } session "s1" @@ -20,6 +22,13 @@ step "s1-create_distributed_table" SELECT create_distributed_table('table_to_distribute', 'id'); } +step "s1_set-shard-property" +{ + SELECT citus_shard_property_set(shardid, anti_affinity=>'true') + FROM pg_dist_shard WHERE logicalrelid = 'table_to_distribute'::regclass + ORDER BY shardid LIMIT 1; +} + step "s1-copy_to_local_table" { COPY table_to_distribute FROM PROGRAM 'echo 0 && echo 1 && echo 2 && echo 3 && echo 4 && echo 5 && echo 6 && echo 7 && echo 8'; @@ -30,6 +39,11 @@ step "s1-commit" COMMIT; } +step "s1-rollback" +{ + ROLLBACK; +} + session "s2" step "s2-begin" @@ -42,6 +56,11 @@ step "s2-create_distributed_table" SELECT create_distributed_table('table_to_distribute', 'id'); } +step "s2-create_distributed_table_colocated" +{ + SELECT create_distributed_table('table_to_colocate', 'id', colocate_with=>'table_to_distribute'); +} + step "s2-copy_to_local_table" { COPY table_to_distribute FROM PROGRAM 'echo 0 && echo 1 && echo 2 && echo 3 && echo 4 && echo 5 && echo 6 && echo 7 && echo 8'; @@ -61,3 +80,7 @@ permutation "s1-begin" "s2-begin" "s2-copy_to_local_table" "s1-create_distribute //concurrent create_distributed_table on non-empty table permutation "s1-copy_to_local_table" "s1-begin" "s2-begin" "s1-create_distributed_table" "s2-create_distributed_table" "s1-commit" "s2-commit" + +//concurrent create_distributed_table vs citus_shard_property_set +permutation "s1-create_distributed_table" "s1-begin" "s2-begin" "s1_set-shard-property" "s2-create_distributed_table_colocated" "s1-rollback" "s2-commit" +permutation "s1-create_distributed_table" "s1-begin" "s2-begin" "s2-create_distributed_table_colocated" "s1_set-shard-property" "s1-rollback" "s2-commit" diff --git a/src/test/regress/sql/isolate_placement.sql b/src/test/regress/sql/isolate_placement.sql index b6f7c26b9..bf8d01f2a 100644 --- a/src/test/regress/sql/isolate_placement.sql +++ b/src/test/regress/sql/isolate_placement.sql @@ -16,8 +16,16 @@ SET search_path TO isolate_placement; SELECT citus_internal_shard_group_set_needsisolatednode(0, NULL); SELECT citus_internal_shard_group_set_needsisolatednode(NULL, false); +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 2000000; + +CREATE TABLE single_shard_1(a int); +SELECT create_distributed_table('single_shard_1', null, colocate_with=>'none'); + -- test with user that doesn't have permission to execute the function -SELECT citus_internal_shard_group_set_needsisolatednode(0, true); +SELECT citus_internal_shard_group_set_needsisolatednode(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; + +DROP TABLE single_shard_1; CREATE ROLE test_user_isolate_placement WITH LOGIN; GRANT ALL ON SCHEMA isolate_placement TO test_user_isolate_placement; @@ -46,8 +54,61 @@ SELECT pg_reload_conf(); SELECT pg_sleep(0.1); SET search_path TO isolate_placement; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 2001000; -SET citus.next_shard_id TO 2000000; +CREATE USER mysuperuser superuser; +SET ROLE mysuperuser; + +CREATE TABLE single_shard_1(a int); +SELECT create_distributed_table('single_shard_1', null, colocate_with=>'none'); + +CREATE USER regularuser; +GRANT USAGE ON SCHEMA isolate_placement TO regularuser; +ALTER SYSTEM SET citus.enable_manual_metadata_changes_for_user TO 'regularuser'; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +SET ROLE regularuser; + +-- throws an error as the user is not the owner of the table +SELECT citus_shard_property_set(shardid) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; +SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; +SELECT citus_internal_shard_group_set_needsisolatednode(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; + +-- assign all tables to regularuser +RESET ROLE; +SELECT result FROM run_command_on_all_nodes($$ REASSIGN OWNED BY mysuperuser TO regularuser; $$); + +SET ROLE regularuser; + +SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; + +SELECT result FROM run_command_on_all_nodes($$ + SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.single_shard_1') +$$) +ORDER BY result; + +SELECT citus_internal_shard_group_set_needsisolatednode(shardid, false) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; + +SELECT result FROM run_command_on_all_nodes($$ + SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.single_shard_1') +$$) +ORDER BY result; + +SELECT citus_internal_shard_group_set_needsisolatednode(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; + +DROP TABLE single_shard_1; +RESET ROLE; +REVOKE USAGE ON SCHEMA isolate_placement FROM regularuser; +ALTER SYSTEM RESET citus.enable_manual_metadata_changes_for_user; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); +DROP ROLE regularuser, mysuperuser; + +SET search_path TO isolate_placement; + +SET citus.next_shard_id TO 2002000; SET citus.shard_count TO 32; SET citus.shard_replication_factor TO 1; @@ -70,7 +131,7 @@ SET citus.shard_replication_factor TO 1; SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; SELECT shardids[2] AS shardgroup_5_shardid FROM public.get_enumerated_shard_groups('isolate_placement.dist_1') @@ -87,7 +148,7 @@ SELECT citus_shard_property_set(:shardgroup_10_shardid, anti_affinity=>true); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; SELECT shardids[1] AS shardgroup_3_shardid FROM public.get_enumerated_shard_groups('isolate_placement.dist_1') @@ -98,7 +159,7 @@ SELECT citus_shard_property_set(:shardgroup_3_shardid, anti_affinity=>false); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; SELECT shardids[1] AS shardgroup_10_shardid FROM public.get_enumerated_shard_groups('isolate_placement.dist_1') @@ -109,7 +170,7 @@ SELECT citus_shard_property_set(:shardgroup_10_shardid, anti_affinity=>false); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; SELECT shardids[1] AS shardgroup_5_shardid FROM public.get_enumerated_shard_groups('isolate_placement.dist_1') @@ -120,7 +181,7 @@ SELECT citus_shard_property_set(:shardgroup_5_shardid, anti_affinity=>true); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; -- test metadata sync @@ -148,7 +209,7 @@ SELECT citus_shard_property_set(:shardgroup_5_shardid, anti_affinity=>true); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; CREATE TABLE dist_4(a int); SELECT create_distributed_table('dist_4', 'a', colocate_with=>'dist_1'); @@ -161,7 +222,7 @@ SELECT create_distributed_table_concurrently('dist_4_concurrently', 'a', colocat SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; DROP TABLE dist_4, dist_4_concurrently; @@ -224,7 +285,7 @@ FROM get_candidate_node_for_shard_transfer(:shardgroup_15_shardid); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; DROP TABLE dist_1, dist_2, dist_3; @@ -242,7 +303,7 @@ SELECT citus_shard_property_set(:shardgroup_3_shardid, anti_affinity=>true); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; -- so that replicate_table_shards works UPDATE pg_dist_partition SET repmodel = 'c' WHERE logicalrelid = 'isolate_placement.dist_1'::regclass; @@ -254,7 +315,7 @@ SET client_min_messages TO NOTICE; SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; DROP TABLE dist_1, dist_2; @@ -272,7 +333,7 @@ SELECT citus_shard_property_set(:shardgroup_9_shardid, anti_affinity=>true); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset @@ -290,7 +351,7 @@ WHERE shardid = :shardgroup_9_shardid; SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; SELECT shardids[1] AS shardgroup_12_shardid FROM public.get_enumerated_shard_groups('isolate_placement.dist_1') @@ -301,7 +362,7 @@ SELECT citus_shard_property_set(:shardgroup_12_shardid, anti_affinity=>true); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; SELECT shardids[1] AS shardgroup_10_shardid FROM public.get_enumerated_shard_groups('isolate_placement.dist_1') @@ -320,7 +381,7 @@ WHERE shardid = :shardgroup_10_shardid; SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; CREATE TABLE dist_3(a int); SELECT create_distributed_table('dist_3', 'a', colocate_with=>'none'); @@ -334,7 +395,7 @@ SELECT citus_shard_property_set(:shardgroup_17_shardid, anti_affinity=>true); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_3') $$) -ORDER BY nodeid; +ORDER BY result; -- verify that shard key value 100 is stored on shard group 17 select get_shard_id_for_distribution_column('dist_3', 100) = :shardgroup_17_shardid; @@ -346,7 +407,7 @@ SELECT 1 FROM isolate_tenant_to_new_shard('dist_3', 100, shard_transfer_mode => SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_3') $$) -ORDER BY nodeid; +ORDER BY result; SELECT shardids[1] AS shardgroup_18_shardid FROM public.get_enumerated_shard_groups('isolate_placement.dist_3') @@ -357,7 +418,7 @@ SELECT citus_shard_property_set(:shardgroup_18_shardid, anti_affinity=>true); SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_3') $$) -ORDER BY nodeid; +ORDER BY result; -- verify that shard key value 1000 is _not_ stored on shard group 18 SELECT get_shard_id_for_distribution_column('dist_3', 1000) != :shardgroup_18_shardid; @@ -369,8 +430,7 @@ SELECT 1 FROM isolate_tenant_to_new_shard('dist_3', 1000, shard_transfer_mode => SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_3') $$) -ORDER BY nodeid; - +ORDER BY result; CREATE TABLE single_shard_1(a int); SELECT create_distributed_table('single_shard_1', null, colocate_with=>'none'); @@ -391,7 +451,7 @@ SELECT create_distributed_table('single_shard_2', null, colocate_with=>'single_s SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.single_shard_1') $$) -ORDER BY nodeid; +ORDER BY result; -- test invalid input SELECT citus_shard_property_set(NULL, anti_affinity=>true); @@ -399,6 +459,9 @@ SELECT citus_shard_property_set(0, anti_affinity=>true); SELECT citus_shard_property_set(NULL, anti_affinity=>false); SELECT citus_shard_property_set(0, anti_affinity=>false); +-- we verify whether shard exists even if anti_affinity is not provided +SELECT citus_shard_property_set(0, anti_affinity=>NULL); + CREATE TABLE append_table (a int, b int); SELECT create_distributed_table('append_table', 'a', 'append'); SELECT 1 FROM master_create_empty_shard('append_table'); @@ -514,7 +577,7 @@ CALL public.create_range_partitioned_shards('range_table_post', '{"0","25"}','{" SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') $$) -ORDER BY nodeid; +ORDER BY result; -- Make sure that the node that contains shard-group 1 of isolate_placement.dist_1 -- doesn't have any other placements.