From deca9778dbf2d84bc775ba408399dc01c447e88f Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Tue, 31 Oct 2023 14:33:36 +0300 Subject: [PATCH] address feedback --- .../distributed/metadata/metadata_sync.c | 33 +++-- .../distributed/metadata/metadata_utility.c | 134 ++++++++++-------- .../rebalancer_placement_separation.c | 25 ++-- .../distributed/operations/shard_rebalancer.c | 2 +- .../distributed/sql/citus--12.1-1--12.2-1.sql | 2 +- .../sql/downgrades/citus--12.2-1--12.1-1.sql | 4 +- .../12.2-1.sql | 6 - .../latest.sql | 6 - .../12.2-1.sql | 6 + .../latest.sql | 6 + src/include/distributed/metadata_sync.h | 2 +- src/include/distributed/metadata_utility.h | 2 +- .../rebalancer_placement_separation.h | 1 - .../regress/expected/isolate_placement.out | 102 +++++++++---- src/test/regress/expected/multi_extension.out | 2 +- .../regress/expected/multi_test_helpers.out | 18 +++ .../upgrade_isolate_placement_after.out | 8 +- .../expected/upgrade_list_citus_objects.out | 2 +- src/test/regress/sql/isolate_placement.sql | 66 +++++---- src/test/regress/sql/multi_test_helpers.sql | 19 +++ .../sql/upgrade_isolate_placement_after.sql | 8 +- 21 files changed, 283 insertions(+), 171 deletions(-) delete mode 100644 src/backend/distributed/sql/udfs/citus_internal_shard_group_set_needsseparatenode/12.2-1.sql delete mode 100644 src/backend/distributed/sql/udfs/citus_internal_shard_group_set_needsseparatenode/latest.sql create mode 100644 src/backend/distributed/sql/udfs/citus_internal_shard_property_set/12.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_internal_shard_property_set/latest.sql diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index debf7a98d..4e8e45e2f 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -179,7 +179,7 @@ PG_FUNCTION_INFO_V1(citus_internal_delete_colocation_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_tenant_schema); PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema); PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata); -PG_FUNCTION_INFO_V1(citus_internal_shard_group_set_needsseparatenode); +PG_FUNCTION_INFO_V1(citus_internal_shard_property_set); static bool got_SIGTERM = false; @@ -3902,21 +3902,18 @@ citus_internal_update_none_dist_table_metadata(PG_FUNCTION_ARGS) /* - * citus_internal_shard_group_set_needsseparatenode is an internal UDF to - * set needsseparatenode flag for all the shards within the shard group + * citus_internal_shard_property_set is an internal UDF to + * set shard properties for all the shards within the shard group * that given shard belongs to. */ Datum -citus_internal_shard_group_set_needsseparatenode(PG_FUNCTION_ARGS) +citus_internal_shard_property_set(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); PG_ENSURE_ARGNOTNULL(0, "shard_id"); uint64 shardId = PG_GETARG_INT64(0); - PG_ENSURE_ARGNOTNULL(1, "enabled"); - bool enabled = PG_GETARG_BOOL(1); - /* only owner of the table (or superuser) is allowed to modify the Citus metadata */ Oid distributedRelationId = RelationIdForShard(shardId); EnsureTableOwner(distributedRelationId); @@ -3929,7 +3926,15 @@ citus_internal_shard_group_set_needsseparatenode(PG_FUNCTION_ARGS) EnsureCoordinatorInitiatedOperation(); } - ShardGroupSetNeedsSeparateNode(shardId, enabled); + bool *needsSeparateNodePtr = NULL; + + if (!PG_ARGISNULL(1)) + { + needsSeparateNodePtr = palloc(sizeof(bool)); + *needsSeparateNodePtr = PG_GETARG_BOOL(1); + } + + ShardgroupSetProperty(shardId, needsSeparateNodePtr); PG_RETURN_VOID(); } @@ -4135,16 +4140,18 @@ UpdateNoneDistTableMetadataCommand(Oid relationId, char replicationModel, /* - * ShardGroupSetNeedsSeparateNodeCommand returns a command to call - * citus_internal_shard_group_set_needsseparatenode(). + * ShardgroupSetPropertyCommand returns a command to call + * citus_internal_shard_property_set(). */ char * -ShardGroupSetNeedsSeparateNodeCommand(uint64 shardId, bool enabled) +ShardgroupSetPropertyCommand(uint64 shardId, bool *needsSeparateNodePtr) { + char *needsSeparateNodeStr = !needsSeparateNodePtr ? "null" : + (*needsSeparateNodePtr ? "true" : "false"); StringInfo command = makeStringInfo(); appendStringInfo(command, - "SELECT pg_catalog.citus_internal_shard_group_set_needsseparatenode(%lu, %s)", - shardId, enabled ? "true" : "false"); + "SELECT pg_catalog.citus_internal_shard_property_set(%lu, %s)", + shardId, needsSeparateNodeStr); return command->data; } diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index ab7470b21..2f385bbce 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -115,9 +115,7 @@ static void AppendShardIdNameValues(StringInfo selectQuery, ShardInterval *shard static HeapTuple CreateDiskSpaceTuple(TupleDesc tupleDesc, uint64 availableBytes, uint64 totalBytes); static bool GetLocalDiskSpaceStats(uint64 *availableBytes, uint64 *totalBytes); -static void CitusShardPropertySetAntiAffinity(uint64 shardId, bool enabled); -static void ShardGroupSetNeedsSeparateNodeGlobally(uint64 shardId, bool enabled); -static void ShardSetNeedsSeparateNode(uint64 shardId, bool enabled); +static void ShardgroupSetPropertyGlobally(uint64 shardId, bool *needsSeparateNodePtr); static BackgroundTask * DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple); @@ -386,87 +384,81 @@ citus_shard_property_set(PG_FUNCTION_ARGS) Oid colocatedTableId = InvalidOid; foreach_oid(colocatedTableId, colocatedTableList) { - LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock); + /* + * Prevent relations from being dropped while we are setting the + * property. + */ + LockRelationOid(colocatedTableId, AccessShareLock); } + bool *needsSeparateNodePtr = NULL; + if (!PG_ARGISNULL(1)) { - bool antiAffinity = PG_GETARG_BOOL(1); - CitusShardPropertySetAntiAffinity(shardId, antiAffinity); + Oid distributedRelationId = RelationIdForShard(shardId); + if (!IsCitusTableType(distributedRelationId, HASH_DISTRIBUTED) && + !IsCitusTableType(distributedRelationId, SINGLE_SHARD_DISTRIBUTED)) + { + ereport(ERROR, (errmsg("setting anti-affinity property is only " + "supported for hash distributed tables"))); + } + + needsSeparateNodePtr = palloc(sizeof(bool)); + *needsSeparateNodePtr = PG_GETARG_BOOL(1); } + ShardgroupSetPropertyGlobally(shardId, needsSeparateNodePtr); + PG_RETURN_VOID(); } /* - * CitusShardPropertySetAntiAffinity is an helper function for - * citus_shard_property_set UDF to set anti_affinity property for given - * shard. - */ -static void -CitusShardPropertySetAntiAffinity(uint64 shardId, bool enabled) -{ - Oid distributedRelationId = RelationIdForShard(shardId); - if (!IsCitusTableType(distributedRelationId, HASH_DISTRIBUTED) && - !IsCitusTableType(distributedRelationId, SINGLE_SHARD_DISTRIBUTED)) - { - ereport(ERROR, (errmsg("setting anti-affinity property is only " - "supported for hash distributed tables"))); - } - - ShardGroupSetNeedsSeparateNodeGlobally(shardId, enabled); -} - - -/* - * ShardGroupSetNeedsSeparateNodeGlobally calls ShardGroupSetNeedsSeparateNode + * ShardgroupSetPropertyGlobally calls ShardgroupSetProperty * on all nodes. */ static void -ShardGroupSetNeedsSeparateNodeGlobally(uint64 shardId, bool enabled) +ShardgroupSetPropertyGlobally(uint64 shardId, bool *needsSeparateNodePtr) { - ShardGroupSetNeedsSeparateNode(shardId, enabled); + ShardgroupSetProperty(shardId, needsSeparateNodePtr); char *metadataCommand = - ShardGroupSetNeedsSeparateNodeCommand(shardId, enabled); + ShardgroupSetPropertyCommand(shardId, needsSeparateNodePtr); SendCommandToWorkersWithMetadata(metadataCommand); } /* - * ShardGroupSetNeedsSeparateNode sets the needsseparatenode flag to desired - * value for all the shards within the shard group that given shard belongs - * to. + * ShardgroupSetProperty sets shard properties for all the shards within + * the shard group that given shard belongs to. */ void -ShardGroupSetNeedsSeparateNode(uint64 shardId, bool enabled) +ShardgroupSetProperty(uint64 shardId, bool *needsSeparateNodePtr) { ShardInterval *shardInterval = LoadShardInterval(shardId); List *colocatedShardIntervalList = ColocatedShardIntervalList(shardInterval); + int nShardInterval = list_length(colocatedShardIntervalList); + Datum *shardIdDatumArray = (Datum *) palloc(nShardInterval * sizeof(Datum)); + + int shardIndex = 0; ShardInterval *colocatedShardInterval = NULL; foreach_ptr(colocatedShardInterval, colocatedShardIntervalList) { - ShardSetNeedsSeparateNode(colocatedShardInterval->shardId, - enabled); + shardIdDatumArray[shardIndex] = UInt64GetDatum(colocatedShardInterval->shardId); + shardIndex++; } -} + ArrayType *shardIdArrayDatum = DatumArrayToArrayType(shardIdDatumArray, + nShardInterval, INT8OID); -/* - * ShardSetNeedsSeparateNode sets the needsseparatenode flag to desired - * value for the given shard. - */ -static void -ShardSetNeedsSeparateNode(uint64 shardId, bool enabled) -{ Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock); ScanKeyData scanKey[1]; int scanKeyCount = 1; ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid, - BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId)); + BTEqualStrategyNumber, F_INT8EQ, PointerGetDatum(shardIdArrayDatum)); + scanKey[0].sk_flags |= SK_SEARCHARRAY; bool indexOK = true; Oid indexId = DistShardShardidIndexId(); @@ -474,14 +466,6 @@ ShardSetNeedsSeparateNode(uint64 shardId, bool enabled) indexId, indexOK, NULL, scanKeyCount, scanKey); - HeapTuple heapTuple = systable_getnext(scanDescriptor); - if (!HeapTupleIsValid(heapTuple)) - { - ereport(ERROR, (errmsg("could not find valid entry for shard " - UINT64_FORMAT, - shardId))); - } - Datum values[Natts_pg_dist_shard]; bool isnull[Natts_pg_dist_shard]; bool replace[Natts_pg_dist_shard]; @@ -490,16 +474,46 @@ ShardSetNeedsSeparateNode(uint64 shardId, bool enabled) memset(isnull, false, sizeof(isnull)); memset(replace, false, sizeof(replace)); - values[Anum_pg_dist_shard_needsseparatenode - 1] = BoolGetDatum(enabled); - isnull[Anum_pg_dist_shard_needsseparatenode - 1] = false; - replace[Anum_pg_dist_shard_needsseparatenode - 1] = true; + if (needsSeparateNodePtr) + { + values[Anum_pg_dist_shard_needsseparatenode - 1] = BoolGetDatum( + *needsSeparateNodePtr); + isnull[Anum_pg_dist_shard_needsseparatenode - 1] = false; + replace[Anum_pg_dist_shard_needsseparatenode - 1] = true; + } - TupleDesc tupleDescriptor = RelationGetDescr(pgDistShard); - heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); + bool updatedAny = false; - CatalogTupleUpdate(pgDistShard, &heapTuple->t_self, heapTuple); + CatalogIndexState indexState = CatalogOpenIndexes(pgDistShard); - CitusInvalidateRelcacheByShardId(shardId); + HeapTuple heapTuple = NULL; + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + TupleDesc tupleDescriptor = RelationGetDescr(pgDistShard); + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, + replace); + + CatalogTupleUpdateWithInfo(pgDistShard, &heapTuple->t_self, heapTuple, + indexState); + + updatedAny = true; + } + + if (!updatedAny) + { + ereport(ERROR, (errmsg("could not find valid entry for shard " + UINT64_FORMAT, + shardId))); + } + + CatalogCloseIndexes(indexState); + + /* + * We don't need to send invalidations for all the shards as + * CitusInvalidateRelcacheByShardId() will send the invalidation based on + * id of the belonging distributed table, not just for the input shard. + */ + CitusInvalidateRelcacheByShardId(shardInterval->shardId); CommandCounterIncrement(); diff --git a/src/backend/distributed/operations/rebalancer_placement_separation.c b/src/backend/distributed/operations/rebalancer_placement_separation.c index ff04fe52b..e7658fc80 100644 --- a/src/backend/distributed/operations/rebalancer_placement_separation.c +++ b/src/backend/distributed/operations/rebalancer_placement_separation.c @@ -25,7 +25,7 @@ #include "distributed/shard_rebalancer.h" -struct RebalancerPlacementSeparationContext +typedef struct RebalancerPlacementSeparationContext { /* * Hash table where each entry is of the form NodeToPlacementGroupHashEntry, @@ -33,7 +33,7 @@ struct RebalancerPlacementSeparationContext * a NodeToPlacementGroupHashEntry. */ HTAB *nodePlacementGroupHash; -}; +} RebalancerPlacementSeparationContext; /* @@ -73,7 +73,7 @@ typedef struct NodeToPlacementGroupHashEntry * Shardgroup placement that is assigned to this node to be separated * from others in the cluster. * - * NULL if no shardgroup placement is assigned yet. + * NULL if no shardgroup placement is not assigned yet. */ ShardgroupPlacement *assignedPlacementGroup; } NodeToPlacementGroupHashEntry; @@ -89,12 +89,12 @@ static void TryAssignPlacementGroupsToNodeGroups( RebalancerPlacementSeparationContext *context, List *activeWorkerNodeList, List *rebalancePlacementList, - FmgrInfo *shardAllowedOnNodeUDF); + FmgrInfo shardAllowedOnNodeUDF); static bool TryAssignPlacementGroupToNodeGroup( RebalancerPlacementSeparationContext *context, int32 candidateNodeGroupId, ShardPlacement *shardPlacement, - FmgrInfo *shardAllowedOnNodeUDF); + FmgrInfo shardAllowedOnNodeUDF); /* other helpers */ @@ -109,15 +109,15 @@ static List * PlacementListGetUniqueNodeGroupIds(List *placementList); RebalancerPlacementSeparationContext * PrepareRebalancerPlacementSeparationContext(List *activeWorkerNodeList, List *rebalancePlacementList, - FmgrInfo *shardAllowedOnNodeUDF) + FmgrInfo shardAllowedOnNodeUDF) { HTAB *nodePlacementGroupHash = - CreateSimpleHashWithNameAndSize(uint32, NodeToPlacementGroupHashEntry, + CreateSimpleHashWithNameAndSize(int32, NodeToPlacementGroupHashEntry, "NodeToPlacementGroupHash", list_length(activeWorkerNodeList)); RebalancerPlacementSeparationContext *context = - palloc(sizeof(RebalancerPlacementSeparationContext)); + palloc0(sizeof(RebalancerPlacementSeparationContext)); context->nodePlacementGroupHash = nodePlacementGroupHash; activeWorkerNodeList = SortList(activeWorkerNodeList, CompareWorkerNodes); @@ -158,8 +158,7 @@ InitRebalancerPlacementSeparationContext(RebalancerPlacementSeparationContext *c hash_search(nodePlacementGroupHash, &workerNode->groupId, HASH_ENTER, NULL); - nodePlacementGroupHashEntry->shouldHaveShards = - workerNode->shouldHaveShards; + nodePlacementGroupHashEntry->shouldHaveShards = workerNode->shouldHaveShards; nodePlacementGroupHashEntry->hasPlacementsThatCannotBeMovedAway = false; nodePlacementGroupHashEntry->assignedPlacementGroup = NULL; @@ -219,7 +218,7 @@ static void TryAssignPlacementGroupsToNodeGroups(RebalancerPlacementSeparationContext *context, List *activeWorkerNodeList, List *rebalancePlacementList, - FmgrInfo *shardAllowedOnNodeUDF) + FmgrInfo shardAllowedOnNodeUDF) { List *unassignedPlacementList = NIL; @@ -294,7 +293,7 @@ static bool TryAssignPlacementGroupToNodeGroup(RebalancerPlacementSeparationContext *context, int32 candidateNodeGroupId, ShardPlacement *shardPlacement, - FmgrInfo *shardAllowedOnNodeUDF) + FmgrInfo shardAllowedOnNodeUDF) { HTAB *nodePlacementGroupHash = context->nodePlacementGroupHash; @@ -330,7 +329,7 @@ TryAssignPlacementGroupToNodeGroup(RebalancerPlacementSeparationContext *context } WorkerNode *workerNode = PrimaryNodeForGroup(candidateNodeGroupId, NULL); - Datum allowed = FunctionCall2(shardAllowedOnNodeUDF, shardPlacement->shardId, + Datum allowed = FunctionCall2(&shardAllowedOnNodeUDF, shardPlacement->shardId, workerNode->nodeId); if (!DatumGetBool(allowed)) { diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index ab4f2d3c8..06c188bb1 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -598,7 +598,7 @@ GetRebalanceSteps(RebalanceOptions *options) PrepareRebalancerPlacementSeparationContext( activeWorkerList, FlattenNestedList(activeShardPlacementListList), - &context.shardAllowedOnNodeUDF); + context.shardAllowedOnNodeUDF); return RebalancePlacementUpdates(activeWorkerList, activeShardPlacementListList, diff --git a/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql b/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql index 6883e5622..fbc59c2ee 100644 --- a/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql +++ b/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql @@ -9,7 +9,7 @@ ALTER TABLE pg_dist_shard ADD COLUMN needsseparatenode boolean NOT NULL DEFAULT DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text); #include "udfs/citus_internal_add_shard_metadata/12.2-1.sql" -#include "udfs/citus_internal_shard_group_set_needsseparatenode/12.2-1.sql" +#include "udfs/citus_internal_shard_property_set/12.2-1.sql" #include "udfs/citus_shard_property_set/12.2-1.sql" DROP VIEW citus_shards; diff --git a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql index 61a335192..73be69b97 100644 --- a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql @@ -10,8 +10,8 @@ DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "ch DROP FUNCTION pg_catalog.citus_shard_property_set(shard_id bigint, anti_affinity boolean); -DROP FUNCTION pg_catalog.citus_internal_shard_group_set_needsseparatenode( +DROP FUNCTION pg_catalog.citus_internal_shard_property_set( shard_id bigint, - enabled boolean); + needs_separate_node boolean); ALTER TABLE pg_dist_shard DROP COLUMN needsseparatenode; diff --git a/src/backend/distributed/sql/udfs/citus_internal_shard_group_set_needsseparatenode/12.2-1.sql b/src/backend/distributed/sql/udfs/citus_internal_shard_group_set_needsseparatenode/12.2-1.sql deleted file mode 100644 index 541c8cf14..000000000 --- a/src/backend/distributed/sql/udfs/citus_internal_shard_group_set_needsseparatenode/12.2-1.sql +++ /dev/null @@ -1,6 +0,0 @@ -CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_shard_group_set_needsseparatenode( - shard_id bigint, - enabled boolean) - RETURNS void - LANGUAGE C VOLATILE - AS 'MODULE_PATHNAME', $$citus_internal_shard_group_set_needsseparatenode$$; diff --git a/src/backend/distributed/sql/udfs/citus_internal_shard_group_set_needsseparatenode/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_shard_group_set_needsseparatenode/latest.sql deleted file mode 100644 index 541c8cf14..000000000 --- a/src/backend/distributed/sql/udfs/citus_internal_shard_group_set_needsseparatenode/latest.sql +++ /dev/null @@ -1,6 +0,0 @@ -CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_shard_group_set_needsseparatenode( - shard_id bigint, - enabled boolean) - RETURNS void - LANGUAGE C VOLATILE - AS 'MODULE_PATHNAME', $$citus_internal_shard_group_set_needsseparatenode$$; diff --git a/src/backend/distributed/sql/udfs/citus_internal_shard_property_set/12.2-1.sql b/src/backend/distributed/sql/udfs/citus_internal_shard_property_set/12.2-1.sql new file mode 100644 index 000000000..e4ac28be7 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_shard_property_set/12.2-1.sql @@ -0,0 +1,6 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_shard_property_set( + shard_id bigint, + needs_separate_node boolean) + RETURNS void + LANGUAGE C VOLATILE + AS 'MODULE_PATHNAME', $$citus_internal_shard_property_set$$; diff --git a/src/backend/distributed/sql/udfs/citus_internal_shard_property_set/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_shard_property_set/latest.sql new file mode 100644 index 000000000..e4ac28be7 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_shard_property_set/latest.sql @@ -0,0 +1,6 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_shard_property_set( + shard_id bigint, + needs_separate_node boolean) + RETURNS void + LANGUAGE C VOLATILE + AS 'MODULE_PATHNAME', $$citus_internal_shard_property_set$$; diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index e1755a27d..8a29ed3fe 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -141,7 +141,7 @@ extern char * TenantSchemaInsertCommand(Oid schemaId, uint32 colocationId); extern char * TenantSchemaDeleteCommand(char *schemaName); extern char * UpdateNoneDistTableMetadataCommand(Oid relationId, char replicationModel, uint32 colocationId, bool autoConverted); -extern char * ShardGroupSetNeedsSeparateNodeCommand(uint64 shardId, bool enabled); +extern char * ShardgroupSetPropertyCommand(uint64 shardId, bool *needsSeparateNodePtr); extern char * AddPlacementMetadataCommand(uint64 shardId, uint64 placementId, uint64 shardLength, int32 groupId); extern char * DeletePlacementMetadataCommand(uint64 placementId); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index eeb0b34b4..1fbce7534 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -447,7 +447,7 @@ extern List * SendShardStatisticsQueriesInParallel(List *citusTableIds, extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection *connection, uint64 *availableBytes, uint64 *totalBytes); -extern void ShardGroupSetNeedsSeparateNode(uint64 shardId, bool enabled); +extern void ShardgroupSetProperty(uint64 shardId, bool *needsSeparateNodePtr); extern void ExecuteQueryViaSPI(char *query, int SPIOK); extern void ExecuteAndLogQueryViaSPI(char *query, int SPIOK, int logLevel); extern void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid diff --git a/src/include/distributed/rebalancer_placement_separation.h b/src/include/distributed/rebalancer_placement_separation.h index 133265ca6..94a343632 100644 --- a/src/include/distributed/rebalancer_placement_separation.h +++ b/src/include/distributed/rebalancer_placement_separation.h @@ -27,7 +27,6 @@ extern RebalancerPlacementSeparationContext * PrepareRebalancerPlacementSeparati * activeShardPlacementList, FmgrInfo - * shardAllowedOnNodeUDF); extern bool RebalancerPlacementSeparationContextPlacementIsAllowedOnWorker( RebalancerPlacementSeparationContext *context, diff --git a/src/test/regress/expected/isolate_placement.out b/src/test/regress/expected/isolate_placement.out index 354fa4fb0..688ed8116 100644 --- a/src/test/regress/expected/isolate_placement.out +++ b/src/test/regress/expected/isolate_placement.out @@ -7,32 +7,10 @@ SET client_min_messages TO WARNING; CALL citus_cleanup_orphaned_resources(); RESET client_min_messages; --- Returns true if all placement groups within given shard group are isolated. --- --- Not created in isolate_placement schema because it's dropped a few times during the test. -CREATE OR REPLACE FUNCTION verify_placements_in_shard_group_isolated( - qualified_table_name text, - shard_group_index bigint) -RETURNS boolean -AS $func$ -DECLARE - v_result boolean; - BEGIN - SELECT bool_and(has_separate_node) INTO v_result - FROM citus_shards - JOIN ( - SELECT shardids FROM public.get_enumerated_shard_groups(qualified_table_name) WHERE shardgroupindex = shard_group_index - ) q - ON (shardid = ANY(q.shardids)); - RETURN v_result; - END; -$func$ LANGUAGE plpgsql; CREATE SCHEMA isolate_placement; SET search_path TO isolate_placement; -- test null input -SELECT citus_internal_shard_group_set_needsseparatenode(0, NULL); -ERROR: enabled cannot be NULL -SELECT citus_internal_shard_group_set_needsseparatenode(NULL, false); +SELECT citus_internal_shard_property_set(NULL, false); ERROR: shard_id cannot be NULL SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 2000000; @@ -44,7 +22,7 @@ SELECT create_distributed_table('single_shard_1', null, colocate_with=>'none'); (1 row) -- test with user that doesn't have permission to execute the function -SELECT citus_internal_shard_group_set_needsseparatenode(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; +SELECT citus_internal_shard_property_set(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; ERROR: This is an internal Citus function can only be used in a distributed transaction DROP TABLE single_shard_1; CREATE ROLE test_user_isolate_placement WITH LOGIN; @@ -64,7 +42,7 @@ SELECT pg_sleep(0.1); SET ROLE test_user_isolate_placement; -- test invalid shard id -SELECT citus_internal_shard_group_set_needsseparatenode(0, true); +SELECT citus_internal_shard_property_set(0, true); ERROR: could not find valid entry for shard xxxxx -- test null needs_separate_node SELECT citus_internal_add_shard_metadata( @@ -124,7 +102,7 @@ SELECT citus_shard_property_set(shardid) FROM pg_dist_shard WHERE logicalrelid = ERROR: must be owner of table single_shard_1 SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; ERROR: must be owner of table single_shard_1 -SELECT citus_internal_shard_group_set_needsseparatenode(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; +SELECT citus_internal_shard_property_set(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; ERROR: must be owner of table single_shard_1 -- assign all tables to regularuser RESET ROLE; @@ -154,8 +132,8 @@ ORDER BY result; [{"1": [{"isolate_placement.single_shard_1": true}]}] (3 rows) -SELECT citus_internal_shard_group_set_needsseparatenode(shardid, false) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; - citus_internal_shard_group_set_needsseparatenode +SELECT citus_internal_shard_property_set(shardid, false) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; + citus_internal_shard_property_set --------------------------------------------------------------------- (1 row) @@ -171,8 +149,8 @@ ORDER BY result; {} (3 rows) -SELECT citus_internal_shard_group_set_needsseparatenode(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; - citus_internal_shard_group_set_needsseparatenode +SELECT citus_internal_shard_property_set(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; + citus_internal_shard_property_set --------------------------------------------------------------------- (1 row) @@ -244,6 +222,70 @@ ORDER BY result; SELECT shardids[2] AS shardgroup_5_shardid FROM public.get_enumerated_shard_groups('isolate_placement.dist_1') WHERE shardgroupindex = 5 \gset +-- no-op .. +SELECT citus_shard_property_set(:shardgroup_5_shardid, NULL); + citus_shard_property_set +--------------------------------------------------------------------- + +(1 row) + +CREATE ROLE test_user_isolate_placement WITH LOGIN; +GRANT ALL ON SCHEMA isolate_placement TO test_user_isolate_placement; +ALTER TABLE dist_1 OWNER TO test_user_isolate_placement; +ALTER TABLE dist_2 OWNER TO test_user_isolate_placement; +ALTER TABLE dist_3 OWNER TO test_user_isolate_placement; +ALTER SYSTEM SET citus.enable_manual_metadata_changes_for_user TO 'test_user_isolate_placement'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SET ROLE test_user_isolate_placement; +-- no-op .. +SELECT citus_internal_shard_property_set(:shardgroup_5_shardid, NULL); + citus_internal_shard_property_set +--------------------------------------------------------------------- + +(1 row) + +RESET ROLE; +ALTER TABLE dist_1 OWNER TO current_user; +ALTER TABLE dist_2 OWNER TO current_user; +ALTER TABLE dist_3 OWNER TO current_user; +REVOKE ALL ON SCHEMA isolate_placement FROM test_user_isolate_placement; +DROP USER test_user_isolate_placement; +ALTER SYSTEM RESET citus.enable_manual_metadata_changes_for_user; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- .. hence returns empty objects +SELECT result FROM run_command_on_all_nodes($$ + SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') +$$) +ORDER BY result; + result +--------------------------------------------------------------------- + {} + {} + {} +(3 rows) + SELECT citus_shard_property_set(:shardgroup_5_shardid, anti_affinity=>true); citus_shard_property_set --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 86ddd22c3..b9173b290 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1424,7 +1424,7 @@ SELECT * FROM multi_extension.print_extension_changes(); --------------------------------------------------------------------- function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text) void | | function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text,boolean) void - | function citus_internal_shard_group_set_needsseparatenode(bigint,boolean) void + | function citus_internal_shard_property_set(bigint,boolean) void | function citus_shard_property_set(bigint,boolean) void (4 rows) diff --git a/src/test/regress/expected/multi_test_helpers.out b/src/test/regress/expected/multi_test_helpers.out index 74ec1248a..ff8df7405 100644 --- a/src/test/regress/expected/multi_test_helpers.out +++ b/src/test/regress/expected/multi_test_helpers.out @@ -595,3 +595,21 @@ RETURNS SETOF jsonb AS $func$ WHERE needsseparatenodejson::text LIKE '%true%'; END; $func$ LANGUAGE plpgsql; +-- Returns true if all placement groups within given shard group are isolated. +CREATE OR REPLACE FUNCTION verify_placements_in_shard_group_isolated( + qualified_table_name text, + shard_group_index bigint) +RETURNS boolean +AS $func$ +DECLARE + v_result boolean; + BEGIN + SELECT bool_and(has_separate_node) INTO v_result + FROM citus_shards + JOIN ( + SELECT shardids FROM public.get_enumerated_shard_groups(qualified_table_name) WHERE shardgroupindex = shard_group_index + ) q + ON (shardid = ANY(q.shardids)); + RETURN v_result; + END; +$func$ LANGUAGE plpgsql; diff --git a/src/test/regress/expected/upgrade_isolate_placement_after.out b/src/test/regress/expected/upgrade_isolate_placement_after.out index 655dcde46..3ae5cb1b1 100644 --- a/src/test/regress/expected/upgrade_isolate_placement_after.out +++ b/src/test/regress/expected/upgrade_isolate_placement_after.out @@ -1,7 +1,9 @@ -- upgrade_columnar_before renames public to citus_schema and recreates public --- schema. But this file depends some helper functions created earlier within --- the original public schema, so we temporarily rename citus_schema to public --- here; and revert those changes at the end of this file. +-- schema. But this file depends on get_colocated_shards_needisolatednode() +-- function and get_colocated_shards_needisolatednode() depends on another +-- function --get_enumerated_shard_groups()-- that is presumably created earlier +-- within the original public schema, so we temporarily rename citus_schema to +-- public here; and revert those changes at the end of this file. ALTER SCHEMA public RENAME TO old_public; ALTER SCHEMA citus_schema RENAME TO public; SELECT result FROM run_command_on_all_nodes($$ diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 4a96bdd43..6eeb55a20 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -80,7 +80,7 @@ ORDER BY 1; function citus_internal_is_replication_origin_tracking_active() function citus_internal_local_blocked_processes() function citus_internal_mark_node_not_synced(integer,integer) - function citus_internal_shard_group_set_needsseparatenode(bigint,boolean) + function citus_internal_shard_property_set(bigint,boolean) function citus_internal_start_replication_origin_tracking() function citus_internal_stop_replication_origin_tracking() function citus_internal_unregister_tenant_schema_globally(oid,text) diff --git a/src/test/regress/sql/isolate_placement.sql b/src/test/regress/sql/isolate_placement.sql index eb3bd5961..07d7b58dd 100644 --- a/src/test/regress/sql/isolate_placement.sql +++ b/src/test/regress/sql/isolate_placement.sql @@ -9,33 +9,11 @@ SET client_min_messages TO WARNING; CALL citus_cleanup_orphaned_resources(); RESET client_min_messages; --- Returns true if all placement groups within given shard group are isolated. --- --- Not created in isolate_placement schema because it's dropped a few times during the test. -CREATE OR REPLACE FUNCTION verify_placements_in_shard_group_isolated( - qualified_table_name text, - shard_group_index bigint) -RETURNS boolean -AS $func$ -DECLARE - v_result boolean; - BEGIN - SELECT bool_and(has_separate_node) INTO v_result - FROM citus_shards - JOIN ( - SELECT shardids FROM public.get_enumerated_shard_groups(qualified_table_name) WHERE shardgroupindex = shard_group_index - ) q - ON (shardid = ANY(q.shardids)); - RETURN v_result; - END; -$func$ LANGUAGE plpgsql; - CREATE SCHEMA isolate_placement; SET search_path TO isolate_placement; -- test null input -SELECT citus_internal_shard_group_set_needsseparatenode(0, NULL); -SELECT citus_internal_shard_group_set_needsseparatenode(NULL, false); +SELECT citus_internal_shard_property_set(NULL, false); SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 2000000; @@ -44,7 +22,7 @@ CREATE TABLE single_shard_1(a int); SELECT create_distributed_table('single_shard_1', null, colocate_with=>'none'); -- test with user that doesn't have permission to execute the function -SELECT citus_internal_shard_group_set_needsseparatenode(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; +SELECT citus_internal_shard_property_set(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; DROP TABLE single_shard_1; @@ -56,7 +34,7 @@ SELECT pg_sleep(0.1); SET ROLE test_user_isolate_placement; -- test invalid shard id -SELECT citus_internal_shard_group_set_needsseparatenode(0, true); +SELECT citus_internal_shard_property_set(0, true); -- test null needs_separate_node SELECT citus_internal_add_shard_metadata( @@ -95,7 +73,7 @@ SET ROLE regularuser; -- throws an error as the user is not the owner of the table SELECT citus_shard_property_set(shardid) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; -SELECT citus_internal_shard_group_set_needsseparatenode(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; +SELECT citus_internal_shard_property_set(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; -- assign all tables to regularuser RESET ROLE; @@ -110,14 +88,14 @@ SELECT result FROM run_command_on_all_nodes($$ $$) ORDER BY result; -SELECT citus_internal_shard_group_set_needsseparatenode(shardid, false) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; +SELECT citus_internal_shard_property_set(shardid, false) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; SELECT result FROM run_command_on_all_nodes($$ SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.single_shard_1') $$) ORDER BY result; -SELECT citus_internal_shard_group_set_needsseparatenode(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; +SELECT citus_internal_shard_property_set(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; DROP TABLE single_shard_1; RESET ROLE; @@ -158,6 +136,38 @@ SELECT shardids[2] AS shardgroup_5_shardid FROM public.get_enumerated_shard_groups('isolate_placement.dist_1') WHERE shardgroupindex = 5 \gset +-- no-op .. +SELECT citus_shard_property_set(:shardgroup_5_shardid, NULL); + +CREATE ROLE test_user_isolate_placement WITH LOGIN; +GRANT ALL ON SCHEMA isolate_placement TO test_user_isolate_placement; +ALTER TABLE dist_1 OWNER TO test_user_isolate_placement; +ALTER TABLE dist_2 OWNER TO test_user_isolate_placement; +ALTER TABLE dist_3 OWNER TO test_user_isolate_placement; +ALTER SYSTEM SET citus.enable_manual_metadata_changes_for_user TO 'test_user_isolate_placement'; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); +SET ROLE test_user_isolate_placement; + +-- no-op .. +SELECT citus_internal_shard_property_set(:shardgroup_5_shardid, NULL); + +RESET ROLE; +ALTER TABLE dist_1 OWNER TO current_user; +ALTER TABLE dist_2 OWNER TO current_user; +ALTER TABLE dist_3 OWNER TO current_user; +REVOKE ALL ON SCHEMA isolate_placement FROM test_user_isolate_placement; +DROP USER test_user_isolate_placement; +ALTER SYSTEM RESET citus.enable_manual_metadata_changes_for_user; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +-- .. hence returns empty objects +SELECT result FROM run_command_on_all_nodes($$ + SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1') +$$) +ORDER BY result; + SELECT citus_shard_property_set(:shardgroup_5_shardid, anti_affinity=>true); SELECT shardids[3] AS shardgroup_10_shardid diff --git a/src/test/regress/sql/multi_test_helpers.sql b/src/test/regress/sql/multi_test_helpers.sql index c7d375112..d48a7e5f3 100644 --- a/src/test/regress/sql/multi_test_helpers.sql +++ b/src/test/regress/sql/multi_test_helpers.sql @@ -623,3 +623,22 @@ RETURNS SETOF jsonb AS $func$ WHERE needsseparatenodejson::text LIKE '%true%'; END; $func$ LANGUAGE plpgsql; + +-- Returns true if all placement groups within given shard group are isolated. +CREATE OR REPLACE FUNCTION verify_placements_in_shard_group_isolated( + qualified_table_name text, + shard_group_index bigint) +RETURNS boolean +AS $func$ +DECLARE + v_result boolean; + BEGIN + SELECT bool_and(has_separate_node) INTO v_result + FROM citus_shards + JOIN ( + SELECT shardids FROM public.get_enumerated_shard_groups(qualified_table_name) WHERE shardgroupindex = shard_group_index + ) q + ON (shardid = ANY(q.shardids)); + RETURN v_result; + END; +$func$ LANGUAGE plpgsql; diff --git a/src/test/regress/sql/upgrade_isolate_placement_after.sql b/src/test/regress/sql/upgrade_isolate_placement_after.sql index b096ee9aa..5ad9d3f40 100644 --- a/src/test/regress/sql/upgrade_isolate_placement_after.sql +++ b/src/test/regress/sql/upgrade_isolate_placement_after.sql @@ -1,7 +1,9 @@ -- upgrade_columnar_before renames public to citus_schema and recreates public --- schema. But this file depends some helper functions created earlier within --- the original public schema, so we temporarily rename citus_schema to public --- here; and revert those changes at the end of this file. +-- schema. But this file depends on get_colocated_shards_needisolatednode() +-- function and get_colocated_shards_needisolatednode() depends on another +-- function --get_enumerated_shard_groups()-- that is presumably created earlier +-- within the original public schema, so we temporarily rename citus_schema to +-- public here; and revert those changes at the end of this file. ALTER SCHEMA public RENAME TO old_public; ALTER SCHEMA citus_schema RENAME TO public;