address feedback

tenant-schema-isolation-complete-view
Onur Tirtir 2023-10-31 14:33:36 +03:00
parent d4391f2425
commit deca9778db
21 changed files with 283 additions and 171 deletions

View File

@ -179,7 +179,7 @@ PG_FUNCTION_INFO_V1(citus_internal_delete_colocation_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_tenant_schema); PG_FUNCTION_INFO_V1(citus_internal_add_tenant_schema);
PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema); PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema);
PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata); PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata);
PG_FUNCTION_INFO_V1(citus_internal_shard_group_set_needsseparatenode); PG_FUNCTION_INFO_V1(citus_internal_shard_property_set);
static bool got_SIGTERM = false; static bool got_SIGTERM = false;
@ -3902,21 +3902,18 @@ citus_internal_update_none_dist_table_metadata(PG_FUNCTION_ARGS)
/* /*
* citus_internal_shard_group_set_needsseparatenode is an internal UDF to * citus_internal_shard_property_set is an internal UDF to
* set needsseparatenode flag for all the shards within the shard group * set shard properties for all the shards within the shard group
* that given shard belongs to. * that given shard belongs to.
*/ */
Datum Datum
citus_internal_shard_group_set_needsseparatenode(PG_FUNCTION_ARGS) citus_internal_shard_property_set(PG_FUNCTION_ARGS)
{ {
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
PG_ENSURE_ARGNOTNULL(0, "shard_id"); PG_ENSURE_ARGNOTNULL(0, "shard_id");
uint64 shardId = PG_GETARG_INT64(0); uint64 shardId = PG_GETARG_INT64(0);
PG_ENSURE_ARGNOTNULL(1, "enabled");
bool enabled = PG_GETARG_BOOL(1);
/* only owner of the table (or superuser) is allowed to modify the Citus metadata */ /* only owner of the table (or superuser) is allowed to modify the Citus metadata */
Oid distributedRelationId = RelationIdForShard(shardId); Oid distributedRelationId = RelationIdForShard(shardId);
EnsureTableOwner(distributedRelationId); EnsureTableOwner(distributedRelationId);
@ -3929,7 +3926,15 @@ citus_internal_shard_group_set_needsseparatenode(PG_FUNCTION_ARGS)
EnsureCoordinatorInitiatedOperation(); EnsureCoordinatorInitiatedOperation();
} }
ShardGroupSetNeedsSeparateNode(shardId, enabled); bool *needsSeparateNodePtr = NULL;
if (!PG_ARGISNULL(1))
{
needsSeparateNodePtr = palloc(sizeof(bool));
*needsSeparateNodePtr = PG_GETARG_BOOL(1);
}
ShardgroupSetProperty(shardId, needsSeparateNodePtr);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -4135,16 +4140,18 @@ UpdateNoneDistTableMetadataCommand(Oid relationId, char replicationModel,
/* /*
* ShardGroupSetNeedsSeparateNodeCommand returns a command to call * ShardgroupSetPropertyCommand returns a command to call
* citus_internal_shard_group_set_needsseparatenode(). * citus_internal_shard_property_set().
*/ */
char * char *
ShardGroupSetNeedsSeparateNodeCommand(uint64 shardId, bool enabled) ShardgroupSetPropertyCommand(uint64 shardId, bool *needsSeparateNodePtr)
{ {
char *needsSeparateNodeStr = !needsSeparateNodePtr ? "null" :
(*needsSeparateNodePtr ? "true" : "false");
StringInfo command = makeStringInfo(); StringInfo command = makeStringInfo();
appendStringInfo(command, appendStringInfo(command,
"SELECT pg_catalog.citus_internal_shard_group_set_needsseparatenode(%lu, %s)", "SELECT pg_catalog.citus_internal_shard_property_set(%lu, %s)",
shardId, enabled ? "true" : "false"); shardId, needsSeparateNodeStr);
return command->data; return command->data;
} }

View File

@ -115,9 +115,7 @@ static void AppendShardIdNameValues(StringInfo selectQuery, ShardInterval *shard
static HeapTuple CreateDiskSpaceTuple(TupleDesc tupleDesc, uint64 availableBytes, static HeapTuple CreateDiskSpaceTuple(TupleDesc tupleDesc, uint64 availableBytes,
uint64 totalBytes); uint64 totalBytes);
static bool GetLocalDiskSpaceStats(uint64 *availableBytes, uint64 *totalBytes); static bool GetLocalDiskSpaceStats(uint64 *availableBytes, uint64 *totalBytes);
static void CitusShardPropertySetAntiAffinity(uint64 shardId, bool enabled); static void ShardgroupSetPropertyGlobally(uint64 shardId, bool *needsSeparateNodePtr);
static void ShardGroupSetNeedsSeparateNodeGlobally(uint64 shardId, bool enabled);
static void ShardSetNeedsSeparateNode(uint64 shardId, bool enabled);
static BackgroundTask * DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, static BackgroundTask * DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor,
HeapTuple taskTuple); HeapTuple taskTuple);
@ -386,27 +384,17 @@ citus_shard_property_set(PG_FUNCTION_ARGS)
Oid colocatedTableId = InvalidOid; Oid colocatedTableId = InvalidOid;
foreach_oid(colocatedTableId, colocatedTableList) foreach_oid(colocatedTableId, colocatedTableList)
{ {
LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock); /*
* Prevent relations from being dropped while we are setting the
* property.
*/
LockRelationOid(colocatedTableId, AccessShareLock);
} }
bool *needsSeparateNodePtr = NULL;
if (!PG_ARGISNULL(1)) if (!PG_ARGISNULL(1))
{ {
bool antiAffinity = PG_GETARG_BOOL(1);
CitusShardPropertySetAntiAffinity(shardId, antiAffinity);
}
PG_RETURN_VOID();
}
/*
* CitusShardPropertySetAntiAffinity is an helper function for
* citus_shard_property_set UDF to set anti_affinity property for given
* shard.
*/
static void
CitusShardPropertySetAntiAffinity(uint64 shardId, bool enabled)
{
Oid distributedRelationId = RelationIdForShard(shardId); Oid distributedRelationId = RelationIdForShard(shardId);
if (!IsCitusTableType(distributedRelationId, HASH_DISTRIBUTED) && if (!IsCitusTableType(distributedRelationId, HASH_DISTRIBUTED) &&
!IsCitusTableType(distributedRelationId, SINGLE_SHARD_DISTRIBUTED)) !IsCitusTableType(distributedRelationId, SINGLE_SHARD_DISTRIBUTED))
@ -415,58 +403,62 @@ CitusShardPropertySetAntiAffinity(uint64 shardId, bool enabled)
"supported for hash distributed tables"))); "supported for hash distributed tables")));
} }
ShardGroupSetNeedsSeparateNodeGlobally(shardId, enabled); needsSeparateNodePtr = palloc(sizeof(bool));
*needsSeparateNodePtr = PG_GETARG_BOOL(1);
}
ShardgroupSetPropertyGlobally(shardId, needsSeparateNodePtr);
PG_RETURN_VOID();
} }
/* /*
* ShardGroupSetNeedsSeparateNodeGlobally calls ShardGroupSetNeedsSeparateNode * ShardgroupSetPropertyGlobally calls ShardgroupSetProperty
* on all nodes. * on all nodes.
*/ */
static void static void
ShardGroupSetNeedsSeparateNodeGlobally(uint64 shardId, bool enabled) ShardgroupSetPropertyGlobally(uint64 shardId, bool *needsSeparateNodePtr)
{ {
ShardGroupSetNeedsSeparateNode(shardId, enabled); ShardgroupSetProperty(shardId, needsSeparateNodePtr);
char *metadataCommand = char *metadataCommand =
ShardGroupSetNeedsSeparateNodeCommand(shardId, enabled); ShardgroupSetPropertyCommand(shardId, needsSeparateNodePtr);
SendCommandToWorkersWithMetadata(metadataCommand); SendCommandToWorkersWithMetadata(metadataCommand);
} }
/* /*
* ShardGroupSetNeedsSeparateNode sets the needsseparatenode flag to desired * ShardgroupSetProperty sets shard properties for all the shards within
* value for all the shards within the shard group that given shard belongs * the shard group that given shard belongs to.
* to.
*/ */
void void
ShardGroupSetNeedsSeparateNode(uint64 shardId, bool enabled) ShardgroupSetProperty(uint64 shardId, bool *needsSeparateNodePtr)
{ {
ShardInterval *shardInterval = LoadShardInterval(shardId); ShardInterval *shardInterval = LoadShardInterval(shardId);
List *colocatedShardIntervalList = ColocatedShardIntervalList(shardInterval); List *colocatedShardIntervalList = ColocatedShardIntervalList(shardInterval);
int nShardInterval = list_length(colocatedShardIntervalList);
Datum *shardIdDatumArray = (Datum *) palloc(nShardInterval * sizeof(Datum));
int shardIndex = 0;
ShardInterval *colocatedShardInterval = NULL; ShardInterval *colocatedShardInterval = NULL;
foreach_ptr(colocatedShardInterval, colocatedShardIntervalList) foreach_ptr(colocatedShardInterval, colocatedShardIntervalList)
{ {
ShardSetNeedsSeparateNode(colocatedShardInterval->shardId, shardIdDatumArray[shardIndex] = UInt64GetDatum(colocatedShardInterval->shardId);
enabled); shardIndex++;
} }
}
ArrayType *shardIdArrayDatum = DatumArrayToArrayType(shardIdDatumArray,
nShardInterval, INT8OID);
/*
* ShardSetNeedsSeparateNode sets the needsseparatenode flag to desired
* value for the given shard.
*/
static void
ShardSetNeedsSeparateNode(uint64 shardId, bool enabled)
{
Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock); Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock);
ScanKeyData scanKey[1]; ScanKeyData scanKey[1];
int scanKeyCount = 1; int scanKeyCount = 1;
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid, ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId)); BTEqualStrategyNumber, F_INT8EQ, PointerGetDatum(shardIdArrayDatum));
scanKey[0].sk_flags |= SK_SEARCHARRAY;
bool indexOK = true; bool indexOK = true;
Oid indexId = DistShardShardidIndexId(); Oid indexId = DistShardShardidIndexId();
@ -474,14 +466,6 @@ ShardSetNeedsSeparateNode(uint64 shardId, bool enabled)
indexId, indexOK, NULL, indexId, indexOK, NULL,
scanKeyCount, scanKey); scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for shard "
UINT64_FORMAT,
shardId)));
}
Datum values[Natts_pg_dist_shard]; Datum values[Natts_pg_dist_shard];
bool isnull[Natts_pg_dist_shard]; bool isnull[Natts_pg_dist_shard];
bool replace[Natts_pg_dist_shard]; bool replace[Natts_pg_dist_shard];
@ -490,16 +474,46 @@ ShardSetNeedsSeparateNode(uint64 shardId, bool enabled)
memset(isnull, false, sizeof(isnull)); memset(isnull, false, sizeof(isnull));
memset(replace, false, sizeof(replace)); memset(replace, false, sizeof(replace));
values[Anum_pg_dist_shard_needsseparatenode - 1] = BoolGetDatum(enabled); if (needsSeparateNodePtr)
{
values[Anum_pg_dist_shard_needsseparatenode - 1] = BoolGetDatum(
*needsSeparateNodePtr);
isnull[Anum_pg_dist_shard_needsseparatenode - 1] = false; isnull[Anum_pg_dist_shard_needsseparatenode - 1] = false;
replace[Anum_pg_dist_shard_needsseparatenode - 1] = true; replace[Anum_pg_dist_shard_needsseparatenode - 1] = true;
}
bool updatedAny = false;
CatalogIndexState indexState = CatalogOpenIndexes(pgDistShard);
HeapTuple heapTuple = NULL;
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
{
TupleDesc tupleDescriptor = RelationGetDescr(pgDistShard); TupleDesc tupleDescriptor = RelationGetDescr(pgDistShard);
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull,
replace);
CatalogTupleUpdate(pgDistShard, &heapTuple->t_self, heapTuple); CatalogTupleUpdateWithInfo(pgDistShard, &heapTuple->t_self, heapTuple,
indexState);
CitusInvalidateRelcacheByShardId(shardId); updatedAny = true;
}
if (!updatedAny)
{
ereport(ERROR, (errmsg("could not find valid entry for shard "
UINT64_FORMAT,
shardId)));
}
CatalogCloseIndexes(indexState);
/*
* We don't need to send invalidations for all the shards as
* CitusInvalidateRelcacheByShardId() will send the invalidation based on
* id of the belonging distributed table, not just for the input shard.
*/
CitusInvalidateRelcacheByShardId(shardInterval->shardId);
CommandCounterIncrement(); CommandCounterIncrement();

View File

@ -25,7 +25,7 @@
#include "distributed/shard_rebalancer.h" #include "distributed/shard_rebalancer.h"
struct RebalancerPlacementSeparationContext typedef struct RebalancerPlacementSeparationContext
{ {
/* /*
* Hash table where each entry is of the form NodeToPlacementGroupHashEntry, * Hash table where each entry is of the form NodeToPlacementGroupHashEntry,
@ -33,7 +33,7 @@ struct RebalancerPlacementSeparationContext
* a NodeToPlacementGroupHashEntry. * a NodeToPlacementGroupHashEntry.
*/ */
HTAB *nodePlacementGroupHash; HTAB *nodePlacementGroupHash;
}; } RebalancerPlacementSeparationContext;
/* /*
@ -73,7 +73,7 @@ typedef struct NodeToPlacementGroupHashEntry
* Shardgroup placement that is assigned to this node to be separated * Shardgroup placement that is assigned to this node to be separated
* from others in the cluster. * from others in the cluster.
* *
* NULL if no shardgroup placement is assigned yet. * NULL if no shardgroup placement is not assigned yet.
*/ */
ShardgroupPlacement *assignedPlacementGroup; ShardgroupPlacement *assignedPlacementGroup;
} NodeToPlacementGroupHashEntry; } NodeToPlacementGroupHashEntry;
@ -89,12 +89,12 @@ static void TryAssignPlacementGroupsToNodeGroups(
RebalancerPlacementSeparationContext *context, RebalancerPlacementSeparationContext *context,
List *activeWorkerNodeList, List *activeWorkerNodeList,
List *rebalancePlacementList, List *rebalancePlacementList,
FmgrInfo *shardAllowedOnNodeUDF); FmgrInfo shardAllowedOnNodeUDF);
static bool TryAssignPlacementGroupToNodeGroup( static bool TryAssignPlacementGroupToNodeGroup(
RebalancerPlacementSeparationContext *context, RebalancerPlacementSeparationContext *context,
int32 candidateNodeGroupId, int32 candidateNodeGroupId,
ShardPlacement *shardPlacement, ShardPlacement *shardPlacement,
FmgrInfo *shardAllowedOnNodeUDF); FmgrInfo shardAllowedOnNodeUDF);
/* other helpers */ /* other helpers */
@ -109,15 +109,15 @@ static List * PlacementListGetUniqueNodeGroupIds(List *placementList);
RebalancerPlacementSeparationContext * RebalancerPlacementSeparationContext *
PrepareRebalancerPlacementSeparationContext(List *activeWorkerNodeList, PrepareRebalancerPlacementSeparationContext(List *activeWorkerNodeList,
List *rebalancePlacementList, List *rebalancePlacementList,
FmgrInfo *shardAllowedOnNodeUDF) FmgrInfo shardAllowedOnNodeUDF)
{ {
HTAB *nodePlacementGroupHash = HTAB *nodePlacementGroupHash =
CreateSimpleHashWithNameAndSize(uint32, NodeToPlacementGroupHashEntry, CreateSimpleHashWithNameAndSize(int32, NodeToPlacementGroupHashEntry,
"NodeToPlacementGroupHash", "NodeToPlacementGroupHash",
list_length(activeWorkerNodeList)); list_length(activeWorkerNodeList));
RebalancerPlacementSeparationContext *context = RebalancerPlacementSeparationContext *context =
palloc(sizeof(RebalancerPlacementSeparationContext)); palloc0(sizeof(RebalancerPlacementSeparationContext));
context->nodePlacementGroupHash = nodePlacementGroupHash; context->nodePlacementGroupHash = nodePlacementGroupHash;
activeWorkerNodeList = SortList(activeWorkerNodeList, CompareWorkerNodes); activeWorkerNodeList = SortList(activeWorkerNodeList, CompareWorkerNodes);
@ -158,8 +158,7 @@ InitRebalancerPlacementSeparationContext(RebalancerPlacementSeparationContext *c
hash_search(nodePlacementGroupHash, &workerNode->groupId, HASH_ENTER, hash_search(nodePlacementGroupHash, &workerNode->groupId, HASH_ENTER,
NULL); NULL);
nodePlacementGroupHashEntry->shouldHaveShards = nodePlacementGroupHashEntry->shouldHaveShards = workerNode->shouldHaveShards;
workerNode->shouldHaveShards;
nodePlacementGroupHashEntry->hasPlacementsThatCannotBeMovedAway = false; nodePlacementGroupHashEntry->hasPlacementsThatCannotBeMovedAway = false;
nodePlacementGroupHashEntry->assignedPlacementGroup = NULL; nodePlacementGroupHashEntry->assignedPlacementGroup = NULL;
@ -219,7 +218,7 @@ static void
TryAssignPlacementGroupsToNodeGroups(RebalancerPlacementSeparationContext *context, TryAssignPlacementGroupsToNodeGroups(RebalancerPlacementSeparationContext *context,
List *activeWorkerNodeList, List *activeWorkerNodeList,
List *rebalancePlacementList, List *rebalancePlacementList,
FmgrInfo *shardAllowedOnNodeUDF) FmgrInfo shardAllowedOnNodeUDF)
{ {
List *unassignedPlacementList = NIL; List *unassignedPlacementList = NIL;
@ -294,7 +293,7 @@ static bool
TryAssignPlacementGroupToNodeGroup(RebalancerPlacementSeparationContext *context, TryAssignPlacementGroupToNodeGroup(RebalancerPlacementSeparationContext *context,
int32 candidateNodeGroupId, int32 candidateNodeGroupId,
ShardPlacement *shardPlacement, ShardPlacement *shardPlacement,
FmgrInfo *shardAllowedOnNodeUDF) FmgrInfo shardAllowedOnNodeUDF)
{ {
HTAB *nodePlacementGroupHash = context->nodePlacementGroupHash; HTAB *nodePlacementGroupHash = context->nodePlacementGroupHash;
@ -330,7 +329,7 @@ TryAssignPlacementGroupToNodeGroup(RebalancerPlacementSeparationContext *context
} }
WorkerNode *workerNode = PrimaryNodeForGroup(candidateNodeGroupId, NULL); WorkerNode *workerNode = PrimaryNodeForGroup(candidateNodeGroupId, NULL);
Datum allowed = FunctionCall2(shardAllowedOnNodeUDF, shardPlacement->shardId, Datum allowed = FunctionCall2(&shardAllowedOnNodeUDF, shardPlacement->shardId,
workerNode->nodeId); workerNode->nodeId);
if (!DatumGetBool(allowed)) if (!DatumGetBool(allowed))
{ {

View File

@ -598,7 +598,7 @@ GetRebalanceSteps(RebalanceOptions *options)
PrepareRebalancerPlacementSeparationContext( PrepareRebalancerPlacementSeparationContext(
activeWorkerList, activeWorkerList,
FlattenNestedList(activeShardPlacementListList), FlattenNestedList(activeShardPlacementListList),
&context.shardAllowedOnNodeUDF); context.shardAllowedOnNodeUDF);
return RebalancePlacementUpdates(activeWorkerList, return RebalancePlacementUpdates(activeWorkerList,
activeShardPlacementListList, activeShardPlacementListList,

View File

@ -9,7 +9,7 @@ ALTER TABLE pg_dist_shard ADD COLUMN needsseparatenode boolean NOT NULL DEFAULT
DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text); DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text);
#include "udfs/citus_internal_add_shard_metadata/12.2-1.sql" #include "udfs/citus_internal_add_shard_metadata/12.2-1.sql"
#include "udfs/citus_internal_shard_group_set_needsseparatenode/12.2-1.sql" #include "udfs/citus_internal_shard_property_set/12.2-1.sql"
#include "udfs/citus_shard_property_set/12.2-1.sql" #include "udfs/citus_shard_property_set/12.2-1.sql"
DROP VIEW citus_shards; DROP VIEW citus_shards;

View File

@ -10,8 +10,8 @@ DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "ch
DROP FUNCTION pg_catalog.citus_shard_property_set(shard_id bigint, anti_affinity boolean); DROP FUNCTION pg_catalog.citus_shard_property_set(shard_id bigint, anti_affinity boolean);
DROP FUNCTION pg_catalog.citus_internal_shard_group_set_needsseparatenode( DROP FUNCTION pg_catalog.citus_internal_shard_property_set(
shard_id bigint, shard_id bigint,
enabled boolean); needs_separate_node boolean);
ALTER TABLE pg_dist_shard DROP COLUMN needsseparatenode; ALTER TABLE pg_dist_shard DROP COLUMN needsseparatenode;

View File

@ -1,6 +0,0 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_shard_group_set_needsseparatenode(
shard_id bigint,
enabled boolean)
RETURNS void
LANGUAGE C VOLATILE
AS 'MODULE_PATHNAME', $$citus_internal_shard_group_set_needsseparatenode$$;

View File

@ -1,6 +0,0 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_shard_group_set_needsseparatenode(
shard_id bigint,
enabled boolean)
RETURNS void
LANGUAGE C VOLATILE
AS 'MODULE_PATHNAME', $$citus_internal_shard_group_set_needsseparatenode$$;

View File

@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_shard_property_set(
shard_id bigint,
needs_separate_node boolean)
RETURNS void
LANGUAGE C VOLATILE
AS 'MODULE_PATHNAME', $$citus_internal_shard_property_set$$;

View File

@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_shard_property_set(
shard_id bigint,
needs_separate_node boolean)
RETURNS void
LANGUAGE C VOLATILE
AS 'MODULE_PATHNAME', $$citus_internal_shard_property_set$$;

View File

@ -141,7 +141,7 @@ extern char * TenantSchemaInsertCommand(Oid schemaId, uint32 colocationId);
extern char * TenantSchemaDeleteCommand(char *schemaName); extern char * TenantSchemaDeleteCommand(char *schemaName);
extern char * UpdateNoneDistTableMetadataCommand(Oid relationId, char replicationModel, extern char * UpdateNoneDistTableMetadataCommand(Oid relationId, char replicationModel,
uint32 colocationId, bool autoConverted); uint32 colocationId, bool autoConverted);
extern char * ShardGroupSetNeedsSeparateNodeCommand(uint64 shardId, bool enabled); extern char * ShardgroupSetPropertyCommand(uint64 shardId, bool *needsSeparateNodePtr);
extern char * AddPlacementMetadataCommand(uint64 shardId, uint64 placementId, extern char * AddPlacementMetadataCommand(uint64 shardId, uint64 placementId,
uint64 shardLength, int32 groupId); uint64 shardLength, int32 groupId);
extern char * DeletePlacementMetadataCommand(uint64 placementId); extern char * DeletePlacementMetadataCommand(uint64 placementId);

View File

@ -447,7 +447,7 @@ extern List * SendShardStatisticsQueriesInParallel(List *citusTableIds,
extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection *connection, extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection *connection,
uint64 *availableBytes, uint64 *availableBytes,
uint64 *totalBytes); uint64 *totalBytes);
extern void ShardGroupSetNeedsSeparateNode(uint64 shardId, bool enabled); extern void ShardgroupSetProperty(uint64 shardId, bool *needsSeparateNodePtr);
extern void ExecuteQueryViaSPI(char *query, int SPIOK); extern void ExecuteQueryViaSPI(char *query, int SPIOK);
extern void ExecuteAndLogQueryViaSPI(char *query, int SPIOK, int logLevel); extern void ExecuteAndLogQueryViaSPI(char *query, int SPIOK, int logLevel);
extern void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid extern void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid

View File

@ -27,7 +27,6 @@ extern RebalancerPlacementSeparationContext * PrepareRebalancerPlacementSeparati
* *
activeShardPlacementList, activeShardPlacementList,
FmgrInfo FmgrInfo
*
shardAllowedOnNodeUDF); shardAllowedOnNodeUDF);
extern bool RebalancerPlacementSeparationContextPlacementIsAllowedOnWorker( extern bool RebalancerPlacementSeparationContextPlacementIsAllowedOnWorker(
RebalancerPlacementSeparationContext *context, RebalancerPlacementSeparationContext *context,

View File

@ -7,32 +7,10 @@
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
CALL citus_cleanup_orphaned_resources(); CALL citus_cleanup_orphaned_resources();
RESET client_min_messages; RESET client_min_messages;
-- Returns true if all placement groups within given shard group are isolated.
--
-- Not created in isolate_placement schema because it's dropped a few times during the test.
CREATE OR REPLACE FUNCTION verify_placements_in_shard_group_isolated(
qualified_table_name text,
shard_group_index bigint)
RETURNS boolean
AS $func$
DECLARE
v_result boolean;
BEGIN
SELECT bool_and(has_separate_node) INTO v_result
FROM citus_shards
JOIN (
SELECT shardids FROM public.get_enumerated_shard_groups(qualified_table_name) WHERE shardgroupindex = shard_group_index
) q
ON (shardid = ANY(q.shardids));
RETURN v_result;
END;
$func$ LANGUAGE plpgsql;
CREATE SCHEMA isolate_placement; CREATE SCHEMA isolate_placement;
SET search_path TO isolate_placement; SET search_path TO isolate_placement;
-- test null input -- test null input
SELECT citus_internal_shard_group_set_needsseparatenode(0, NULL); SELECT citus_internal_shard_property_set(NULL, false);
ERROR: enabled cannot be NULL
SELECT citus_internal_shard_group_set_needsseparatenode(NULL, false);
ERROR: shard_id cannot be NULL ERROR: shard_id cannot be NULL
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 2000000; SET citus.next_shard_id TO 2000000;
@ -44,7 +22,7 @@ SELECT create_distributed_table('single_shard_1', null, colocate_with=>'none');
(1 row) (1 row)
-- test with user that doesn't have permission to execute the function -- test with user that doesn't have permission to execute the function
SELECT citus_internal_shard_group_set_needsseparatenode(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; SELECT citus_internal_shard_property_set(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass;
ERROR: This is an internal Citus function can only be used in a distributed transaction ERROR: This is an internal Citus function can only be used in a distributed transaction
DROP TABLE single_shard_1; DROP TABLE single_shard_1;
CREATE ROLE test_user_isolate_placement WITH LOGIN; CREATE ROLE test_user_isolate_placement WITH LOGIN;
@ -64,7 +42,7 @@ SELECT pg_sleep(0.1);
SET ROLE test_user_isolate_placement; SET ROLE test_user_isolate_placement;
-- test invalid shard id -- test invalid shard id
SELECT citus_internal_shard_group_set_needsseparatenode(0, true); SELECT citus_internal_shard_property_set(0, true);
ERROR: could not find valid entry for shard xxxxx ERROR: could not find valid entry for shard xxxxx
-- test null needs_separate_node -- test null needs_separate_node
SELECT citus_internal_add_shard_metadata( SELECT citus_internal_add_shard_metadata(
@ -124,7 +102,7 @@ SELECT citus_shard_property_set(shardid) FROM pg_dist_shard WHERE logicalrelid =
ERROR: must be owner of table single_shard_1 ERROR: must be owner of table single_shard_1
SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass;
ERROR: must be owner of table single_shard_1 ERROR: must be owner of table single_shard_1
SELECT citus_internal_shard_group_set_needsseparatenode(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; SELECT citus_internal_shard_property_set(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass;
ERROR: must be owner of table single_shard_1 ERROR: must be owner of table single_shard_1
-- assign all tables to regularuser -- assign all tables to regularuser
RESET ROLE; RESET ROLE;
@ -154,8 +132,8 @@ ORDER BY result;
[{"1": [{"isolate_placement.single_shard_1": true}]}] [{"1": [{"isolate_placement.single_shard_1": true}]}]
(3 rows) (3 rows)
SELECT citus_internal_shard_group_set_needsseparatenode(shardid, false) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; SELECT citus_internal_shard_property_set(shardid, false) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass;
citus_internal_shard_group_set_needsseparatenode citus_internal_shard_property_set
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
@ -171,8 +149,8 @@ ORDER BY result;
{} {}
(3 rows) (3 rows)
SELECT citus_internal_shard_group_set_needsseparatenode(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; SELECT citus_internal_shard_property_set(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass;
citus_internal_shard_group_set_needsseparatenode citus_internal_shard_property_set
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
@ -244,6 +222,70 @@ ORDER BY result;
SELECT shardids[2] AS shardgroup_5_shardid SELECT shardids[2] AS shardgroup_5_shardid
FROM public.get_enumerated_shard_groups('isolate_placement.dist_1') FROM public.get_enumerated_shard_groups('isolate_placement.dist_1')
WHERE shardgroupindex = 5 \gset WHERE shardgroupindex = 5 \gset
-- no-op ..
SELECT citus_shard_property_set(:shardgroup_5_shardid, NULL);
citus_shard_property_set
---------------------------------------------------------------------
(1 row)
CREATE ROLE test_user_isolate_placement WITH LOGIN;
GRANT ALL ON SCHEMA isolate_placement TO test_user_isolate_placement;
ALTER TABLE dist_1 OWNER TO test_user_isolate_placement;
ALTER TABLE dist_2 OWNER TO test_user_isolate_placement;
ALTER TABLE dist_3 OWNER TO test_user_isolate_placement;
ALTER SYSTEM SET citus.enable_manual_metadata_changes_for_user TO 'test_user_isolate_placement';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
SET ROLE test_user_isolate_placement;
-- no-op ..
SELECT citus_internal_shard_property_set(:shardgroup_5_shardid, NULL);
citus_internal_shard_property_set
---------------------------------------------------------------------
(1 row)
RESET ROLE;
ALTER TABLE dist_1 OWNER TO current_user;
ALTER TABLE dist_2 OWNER TO current_user;
ALTER TABLE dist_3 OWNER TO current_user;
REVOKE ALL ON SCHEMA isolate_placement FROM test_user_isolate_placement;
DROP USER test_user_isolate_placement;
ALTER SYSTEM RESET citus.enable_manual_metadata_changes_for_user;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- .. hence returns empty objects
SELECT result FROM run_command_on_all_nodes($$
SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1')
$$)
ORDER BY result;
result
---------------------------------------------------------------------
{}
{}
{}
(3 rows)
SELECT citus_shard_property_set(:shardgroup_5_shardid, anti_affinity=>true); SELECT citus_shard_property_set(:shardgroup_5_shardid, anti_affinity=>true);
citus_shard_property_set citus_shard_property_set
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -1424,7 +1424,7 @@ SELECT * FROM multi_extension.print_extension_changes();
--------------------------------------------------------------------- ---------------------------------------------------------------------
function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text) void | function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text) void |
| function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text,boolean) void | function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text,boolean) void
| function citus_internal_shard_group_set_needsseparatenode(bigint,boolean) void | function citus_internal_shard_property_set(bigint,boolean) void
| function citus_shard_property_set(bigint,boolean) void | function citus_shard_property_set(bigint,boolean) void
(4 rows) (4 rows)

View File

@ -595,3 +595,21 @@ RETURNS SETOF jsonb AS $func$
WHERE needsseparatenodejson::text LIKE '%true%'; WHERE needsseparatenodejson::text LIKE '%true%';
END; END;
$func$ LANGUAGE plpgsql; $func$ LANGUAGE plpgsql;
-- Returns true if all placement groups within given shard group are isolated.
CREATE OR REPLACE FUNCTION verify_placements_in_shard_group_isolated(
qualified_table_name text,
shard_group_index bigint)
RETURNS boolean
AS $func$
DECLARE
v_result boolean;
BEGIN
SELECT bool_and(has_separate_node) INTO v_result
FROM citus_shards
JOIN (
SELECT shardids FROM public.get_enumerated_shard_groups(qualified_table_name) WHERE shardgroupindex = shard_group_index
) q
ON (shardid = ANY(q.shardids));
RETURN v_result;
END;
$func$ LANGUAGE plpgsql;

View File

@ -1,7 +1,9 @@
-- upgrade_columnar_before renames public to citus_schema and recreates public -- upgrade_columnar_before renames public to citus_schema and recreates public
-- schema. But this file depends some helper functions created earlier within -- schema. But this file depends on get_colocated_shards_needisolatednode()
-- the original public schema, so we temporarily rename citus_schema to public -- function and get_colocated_shards_needisolatednode() depends on another
-- here; and revert those changes at the end of this file. -- function --get_enumerated_shard_groups()-- that is presumably created earlier
-- within the original public schema, so we temporarily rename citus_schema to
-- public here; and revert those changes at the end of this file.
ALTER SCHEMA public RENAME TO old_public; ALTER SCHEMA public RENAME TO old_public;
ALTER SCHEMA citus_schema RENAME TO public; ALTER SCHEMA citus_schema RENAME TO public;
SELECT result FROM run_command_on_all_nodes($$ SELECT result FROM run_command_on_all_nodes($$

View File

@ -80,7 +80,7 @@ ORDER BY 1;
function citus_internal_is_replication_origin_tracking_active() function citus_internal_is_replication_origin_tracking_active()
function citus_internal_local_blocked_processes() function citus_internal_local_blocked_processes()
function citus_internal_mark_node_not_synced(integer,integer) function citus_internal_mark_node_not_synced(integer,integer)
function citus_internal_shard_group_set_needsseparatenode(bigint,boolean) function citus_internal_shard_property_set(bigint,boolean)
function citus_internal_start_replication_origin_tracking() function citus_internal_start_replication_origin_tracking()
function citus_internal_stop_replication_origin_tracking() function citus_internal_stop_replication_origin_tracking()
function citus_internal_unregister_tenant_schema_globally(oid,text) function citus_internal_unregister_tenant_schema_globally(oid,text)

View File

@ -9,33 +9,11 @@ SET client_min_messages TO WARNING;
CALL citus_cleanup_orphaned_resources(); CALL citus_cleanup_orphaned_resources();
RESET client_min_messages; RESET client_min_messages;
-- Returns true if all placement groups within given shard group are isolated.
--
-- Not created in isolate_placement schema because it's dropped a few times during the test.
CREATE OR REPLACE FUNCTION verify_placements_in_shard_group_isolated(
qualified_table_name text,
shard_group_index bigint)
RETURNS boolean
AS $func$
DECLARE
v_result boolean;
BEGIN
SELECT bool_and(has_separate_node) INTO v_result
FROM citus_shards
JOIN (
SELECT shardids FROM public.get_enumerated_shard_groups(qualified_table_name) WHERE shardgroupindex = shard_group_index
) q
ON (shardid = ANY(q.shardids));
RETURN v_result;
END;
$func$ LANGUAGE plpgsql;
CREATE SCHEMA isolate_placement; CREATE SCHEMA isolate_placement;
SET search_path TO isolate_placement; SET search_path TO isolate_placement;
-- test null input -- test null input
SELECT citus_internal_shard_group_set_needsseparatenode(0, NULL); SELECT citus_internal_shard_property_set(NULL, false);
SELECT citus_internal_shard_group_set_needsseparatenode(NULL, false);
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 2000000; SET citus.next_shard_id TO 2000000;
@ -44,7 +22,7 @@ CREATE TABLE single_shard_1(a int);
SELECT create_distributed_table('single_shard_1', null, colocate_with=>'none'); SELECT create_distributed_table('single_shard_1', null, colocate_with=>'none');
-- test with user that doesn't have permission to execute the function -- test with user that doesn't have permission to execute the function
SELECT citus_internal_shard_group_set_needsseparatenode(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; SELECT citus_internal_shard_property_set(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass;
DROP TABLE single_shard_1; DROP TABLE single_shard_1;
@ -56,7 +34,7 @@ SELECT pg_sleep(0.1);
SET ROLE test_user_isolate_placement; SET ROLE test_user_isolate_placement;
-- test invalid shard id -- test invalid shard id
SELECT citus_internal_shard_group_set_needsseparatenode(0, true); SELECT citus_internal_shard_property_set(0, true);
-- test null needs_separate_node -- test null needs_separate_node
SELECT citus_internal_add_shard_metadata( SELECT citus_internal_add_shard_metadata(
@ -95,7 +73,7 @@ SET ROLE regularuser;
-- throws an error as the user is not the owner of the table -- throws an error as the user is not the owner of the table
SELECT citus_shard_property_set(shardid) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; SELECT citus_shard_property_set(shardid) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass;
SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass;
SELECT citus_internal_shard_group_set_needsseparatenode(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; SELECT citus_internal_shard_property_set(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass;
-- assign all tables to regularuser -- assign all tables to regularuser
RESET ROLE; RESET ROLE;
@ -110,14 +88,14 @@ SELECT result FROM run_command_on_all_nodes($$
$$) $$)
ORDER BY result; ORDER BY result;
SELECT citus_internal_shard_group_set_needsseparatenode(shardid, false) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; SELECT citus_internal_shard_property_set(shardid, false) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass;
SELECT result FROM run_command_on_all_nodes($$ SELECT result FROM run_command_on_all_nodes($$
SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.single_shard_1') SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.single_shard_1')
$$) $$)
ORDER BY result; ORDER BY result;
SELECT citus_internal_shard_group_set_needsseparatenode(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass; SELECT citus_internal_shard_property_set(shardid, true) FROM pg_dist_shard WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass;
DROP TABLE single_shard_1; DROP TABLE single_shard_1;
RESET ROLE; RESET ROLE;
@ -158,6 +136,38 @@ SELECT shardids[2] AS shardgroup_5_shardid
FROM public.get_enumerated_shard_groups('isolate_placement.dist_1') FROM public.get_enumerated_shard_groups('isolate_placement.dist_1')
WHERE shardgroupindex = 5 \gset WHERE shardgroupindex = 5 \gset
-- no-op ..
SELECT citus_shard_property_set(:shardgroup_5_shardid, NULL);
CREATE ROLE test_user_isolate_placement WITH LOGIN;
GRANT ALL ON SCHEMA isolate_placement TO test_user_isolate_placement;
ALTER TABLE dist_1 OWNER TO test_user_isolate_placement;
ALTER TABLE dist_2 OWNER TO test_user_isolate_placement;
ALTER TABLE dist_3 OWNER TO test_user_isolate_placement;
ALTER SYSTEM SET citus.enable_manual_metadata_changes_for_user TO 'test_user_isolate_placement';
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
SET ROLE test_user_isolate_placement;
-- no-op ..
SELECT citus_internal_shard_property_set(:shardgroup_5_shardid, NULL);
RESET ROLE;
ALTER TABLE dist_1 OWNER TO current_user;
ALTER TABLE dist_2 OWNER TO current_user;
ALTER TABLE dist_3 OWNER TO current_user;
REVOKE ALL ON SCHEMA isolate_placement FROM test_user_isolate_placement;
DROP USER test_user_isolate_placement;
ALTER SYSTEM RESET citus.enable_manual_metadata_changes_for_user;
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
-- .. hence returns empty objects
SELECT result FROM run_command_on_all_nodes($$
SELECT * FROM public.get_colocated_shards_needisolatednode('isolate_placement.dist_1')
$$)
ORDER BY result;
SELECT citus_shard_property_set(:shardgroup_5_shardid, anti_affinity=>true); SELECT citus_shard_property_set(:shardgroup_5_shardid, anti_affinity=>true);
SELECT shardids[3] AS shardgroup_10_shardid SELECT shardids[3] AS shardgroup_10_shardid

View File

@ -623,3 +623,22 @@ RETURNS SETOF jsonb AS $func$
WHERE needsseparatenodejson::text LIKE '%true%'; WHERE needsseparatenodejson::text LIKE '%true%';
END; END;
$func$ LANGUAGE plpgsql; $func$ LANGUAGE plpgsql;
-- Returns true if all placement groups within given shard group are isolated.
CREATE OR REPLACE FUNCTION verify_placements_in_shard_group_isolated(
qualified_table_name text,
shard_group_index bigint)
RETURNS boolean
AS $func$
DECLARE
v_result boolean;
BEGIN
SELECT bool_and(has_separate_node) INTO v_result
FROM citus_shards
JOIN (
SELECT shardids FROM public.get_enumerated_shard_groups(qualified_table_name) WHERE shardgroupindex = shard_group_index
) q
ON (shardid = ANY(q.shardids));
RETURN v_result;
END;
$func$ LANGUAGE plpgsql;

View File

@ -1,7 +1,9 @@
-- upgrade_columnar_before renames public to citus_schema and recreates public -- upgrade_columnar_before renames public to citus_schema and recreates public
-- schema. But this file depends some helper functions created earlier within -- schema. But this file depends on get_colocated_shards_needisolatednode()
-- the original public schema, so we temporarily rename citus_schema to public -- function and get_colocated_shards_needisolatednode() depends on another
-- here; and revert those changes at the end of this file. -- function --get_enumerated_shard_groups()-- that is presumably created earlier
-- within the original public schema, so we temporarily rename citus_schema to
-- public here; and revert those changes at the end of this file.
ALTER SCHEMA public RENAME TO old_public; ALTER SCHEMA public RENAME TO old_public;
ALTER SCHEMA citus_schema RENAME TO public; ALTER SCHEMA citus_schema RENAME TO public;