From 2d16b0fd9ed93199a2457d936ee3b9416a80685f Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Mon, 16 Oct 2023 13:21:47 +0300 Subject: [PATCH] address feedback --- .../distributed/metadata/metadata_sync.c | 38 +-- .../distributed/metadata/metadata_utility.c | 129 ++++++---- ...on.c => rebalancer_placement_separation.c} | 233 +++++++++--------- .../distributed/operations/shard_rebalancer.c | 14 +- .../operations/worker_node_manager.c | 10 +- .../distributed/sql/citus--12.1-1--12.2-1.sql | 2 + .../12.2-1.sql | 15 +- .../latest.sql | 15 +- src/include/distributed/metadata_utility.h | 11 +- ...on.h => rebalancer_placement_separation.h} | 12 +- src/include/distributed/worker_manager.h | 2 +- .../regress/expected/isolate_placement.out | 22 +- .../isolation_create_distributed_table.out | 2 +- 13 files changed, 243 insertions(+), 262 deletions(-) rename src/backend/distributed/operations/{rebalancer_placement_isolation.c => rebalancer_placement_separation.c} (62%) rename src/include/distributed/{rebalancer_placement_isolation.h => rebalancer_placement_separation.h} (66%) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 232b3ab35..debf7a98d 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -95,8 +95,6 @@ char *EnableManualMetadataChangesForUser = ""; int MetadataSyncTransMode = METADATA_SYNC_TRANSACTIONAL; -static Datum citus_internal_add_shard_metadata_internal(PG_FUNCTION_ARGS, - bool expectNeedsSeparateNode); static void EnsureObjectMetadataIsSane(int distributionArgumentIndex, int colocationId); static List * GetFunctionDependenciesForObjects(ObjectAddress *objectAddress); @@ -169,7 +167,6 @@ PG_FUNCTION_INFO_V1(worker_record_sequence_dependency); PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_partition_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata); -PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata_legacy); PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata_legacy); @@ -3226,33 +3223,6 @@ citus_internal_delete_partition_metadata(PG_FUNCTION_ARGS) */ Datum citus_internal_add_shard_metadata(PG_FUNCTION_ARGS) -{ - bool expectNeedsSeparateNode = true; - return citus_internal_add_shard_metadata_internal(fcinfo, expectNeedsSeparateNode); -} - - -/* - * citus_internal_add_shard_metadata is an internal UDF to - * add a row to pg_dist_shard, but without the needs_separate_node - * parameter. - */ -Datum -citus_internal_add_shard_metadata_legacy(PG_FUNCTION_ARGS) -{ - bool expectNeedsSeparateNode = false; - return citus_internal_add_shard_metadata_internal(fcinfo, expectNeedsSeparateNode); -} - - -/* - * citus_internal_add_shard_metadata_internal is a helper function for - * citus_internal_add_shard_metadata and citus_internal_add_shard_metadata_legacy - * functions. - */ -static Datum -citus_internal_add_shard_metadata_internal(PG_FUNCTION_ARGS, - bool expectNeedsSeparateNode) { CheckCitusVersion(ERROR); @@ -3277,12 +3247,8 @@ citus_internal_add_shard_metadata_internal(PG_FUNCTION_ARGS, shardMaxValue = PG_GETARG_TEXT_P(4); } - bool needsSeparateNode = false; - if (expectNeedsSeparateNode) - { - PG_ENSURE_ARGNOTNULL(5, "needs separate node"); - needsSeparateNode = PG_GETARG_BOOL(5); - } + PG_ENSURE_ARGNOTNULL(5, "needs separate node"); + bool needsSeparateNode = PG_GETARG_BOOL(5); /* only owner of the table (or superuser) is allowed to add the Citus metadata */ EnsureTableOwner(relationId); diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 70fc7eeb7..ab7470b21 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -115,7 +115,7 @@ static void AppendShardIdNameValues(StringInfo selectQuery, ShardInterval *shard static HeapTuple CreateDiskSpaceTuple(TupleDesc tupleDesc, uint64 availableBytes, uint64 totalBytes); static bool GetLocalDiskSpaceStats(uint64 *availableBytes, uint64 *totalBytes); -static void citus_shard_property_set_anti_affinity(uint64 shardId, bool enabled); +static void CitusShardPropertySetAntiAffinity(uint64 shardId, bool enabled); static void ShardGroupSetNeedsSeparateNodeGlobally(uint64 shardId, bool enabled); static void ShardSetNeedsSeparateNode(uint64 shardId, bool enabled); static BackgroundTask * DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, @@ -373,17 +373,15 @@ citus_shard_property_set(PG_FUNCTION_ARGS) PG_ENSURE_ARGNOTNULL(0, "shard_id"); uint64 shardId = PG_GETARG_INT64(0); - if (!ShardExists(shardId)) - { - ereport(ERROR, (errmsg("shard %lu does not exist", shardId))); - } - + /* RelationIdForShard() first checks whether the shard id is valid */ Oid distributedRelationId = RelationIdForShard(shardId); + List *colocatedTableList = ColocatedTableList(distributedRelationId); + colocatedTableList = SortList(colocatedTableList, CompareOids); EnsureTableListOwner(colocatedTableList); AcquirePlacementColocationLock(distributedRelationId, ExclusiveLock, - "set anti affinity property for a shard of"); + "set a property for a shard of"); Oid colocatedTableId = InvalidOid; foreach_oid(colocatedTableId, colocatedTableList) @@ -394,7 +392,7 @@ citus_shard_property_set(PG_FUNCTION_ARGS) if (!PG_ARGISNULL(1)) { bool antiAffinity = PG_GETARG_BOOL(1); - citus_shard_property_set_anti_affinity(shardId, antiAffinity); + CitusShardPropertySetAntiAffinity(shardId, antiAffinity); } PG_RETURN_VOID(); @@ -402,19 +400,19 @@ citus_shard_property_set(PG_FUNCTION_ARGS) /* - * citus_shard_property_set_anti_affinity is an helper function for + * CitusShardPropertySetAntiAffinity is an helper function for * citus_shard_property_set UDF to set anti_affinity property for given * shard. */ static void -citus_shard_property_set_anti_affinity(uint64 shardId, bool enabled) +CitusShardPropertySetAntiAffinity(uint64 shardId, bool enabled) { Oid distributedRelationId = RelationIdForShard(shardId); if (!IsCitusTableType(distributedRelationId, HASH_DISTRIBUTED) && !IsCitusTableType(distributedRelationId, SINGLE_SHARD_DISTRIBUTED)) { - ereport(ERROR, (errmsg("shard isolation is only supported for hash " - "distributed tables"))); + ereport(ERROR, (errmsg("setting anti-affinity property is only " + "supported for hash distributed tables"))); } ShardGroupSetNeedsSeparateNodeGlobally(shardId, enabled); @@ -1502,18 +1500,17 @@ ShardLength(uint64 shardId) /* - * NodeGroupGetSeparatedShardPlacementGroup returns the shard placement group + * NodeGroupGetSeparatedShardgroupPlacement returns the shard group placement * that given node group is used to separate from others. Returns NULL if this - * node is not used to separate a shard placement group. + * node is not used to separate a shard group placement. */ -ShardPlacementGroup * -NodeGroupGetSeparatedShardPlacementGroup(int32 groupId) +ShardgroupPlacement * +NodeGroupGetSeparatedShardgroupPlacement(int32 groupId) { - ShardPlacementGroup *nodeShardPlacementGroup = NULL; - bool shardPlacementGroupNeedsSeparateNode = false; + ShardgroupPlacement *nodeShardgroupPlacement = NULL; + bool shardgroupPlacementNeedsSeparateNode = false; bool indexOK = true; - int scanKeyCount = 1; ScanKeyData scanKey[1]; Relation pgDistPlacement = table_open(DistPlacementRelationId(), @@ -1524,10 +1521,10 @@ NodeGroupGetSeparatedShardPlacementGroup(int32 groupId) SysScanDesc scanDescriptor = systable_beginscan(pgDistPlacement, DistPlacementGroupidIndexId(), indexOK, - NULL, scanKeyCount, scanKey); + NULL, lengthof(scanKey), scanKey); - HeapTuple heapTuple = systable_getnext(scanDescriptor); - while (HeapTupleIsValid(heapTuple)) + HeapTuple heapTuple = NULL; + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) { TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement); @@ -1538,46 +1535,49 @@ NodeGroupGetSeparatedShardPlacementGroup(int32 groupId) Oid citusTableId = shardInterval->relationId; if (!IsCitusTableType(citusTableId, DISTRIBUTED_TABLE)) { - heapTuple = systable_getnext(scanDescriptor); continue; } - ShardPlacementGroup *shardPlacementGroup = - GetShardPlacementGroupForPlacement(placement->shardId, + ShardgroupPlacement *shardgroupPlacement = + GetShardgroupPlacementForPlacement(placement->shardId, placement->placementId); - if (nodeShardPlacementGroup && - !ShardPlacementGroupsSame(shardPlacementGroup, - nodeShardPlacementGroup)) + if (nodeShardgroupPlacement && + !ShardgroupPlacementsSame(shardgroupPlacement, + nodeShardgroupPlacement)) { - nodeShardPlacementGroup = NULL; + /* + * If we have more than one shardgroup placement on the node, + * then this means that the node is not actually used to separate + * a shardgroup placement. + */ + nodeShardgroupPlacement = NULL; + shardgroupPlacementNeedsSeparateNode = false; break; } - nodeShardPlacementGroup = shardPlacementGroup; - shardPlacementGroupNeedsSeparateNode = shardInterval->needsSeparateNode; - - heapTuple = systable_getnext(scanDescriptor); + nodeShardgroupPlacement = shardgroupPlacement; + shardgroupPlacementNeedsSeparateNode = shardInterval->needsSeparateNode; } systable_endscan(scanDescriptor); table_close(pgDistPlacement, NoLock); - if (!shardPlacementGroupNeedsSeparateNode) + if (!shardgroupPlacementNeedsSeparateNode) { return NULL; } - return nodeShardPlacementGroup; + return nodeShardgroupPlacement; } /* - * ShardPlacementGroupsSame returns true if two shard placement groups are the same. + * ShardgroupPlacementsSame returns true if two shardgroup placements are the same. */ bool -ShardPlacementGroupsSame(const ShardPlacementGroup *leftGroup, - const ShardPlacementGroup *rightGroup) +ShardgroupPlacementsSame(const ShardgroupPlacement *leftGroup, + const ShardgroupPlacement *rightGroup) { return leftGroup->colocatationId == rightGroup->colocatationId && leftGroup->shardIntervalIndex == rightGroup->shardIntervalIndex && @@ -1618,16 +1618,61 @@ NodeGroupHasShardPlacements(int32 groupId) /* - * GetShardPlacementGroupForPlacement returns ShardPlacementGroup that placement + * NodeGroupHasDistributedTableShardPlacements returns whether any active + * distributed table shards are placed on the group + */ +bool +NodeGroupHasDistributedTableShardPlacements(int32 groupId) +{ + bool nodeGroupHasDistributedTableShardPlacements = false; + + Relation pgPlacement = table_open(DistPlacementRelationId(), AccessShareLock); + + ScanKeyData scanKey[1]; + ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_groupid, + BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId)); + + bool indexOK = true; + SysScanDesc scanDescriptor = systable_beginscan(pgPlacement, + DistPlacementGroupidIndexId(), + indexOK, + NULL, lengthof(scanKey), scanKey); + + HeapTuple heapTuple = NULL; + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + TupleDesc tupleDescriptor = RelationGetDescr(pgPlacement); + + GroupShardPlacement *placement = + TupleToGroupShardPlacement(tupleDescriptor, heapTuple); + + ShardInterval *shardInterval = LoadShardInterval(placement->shardId); + Oid citusTableId = shardInterval->relationId; + if (IsCitusTableType(citusTableId, DISTRIBUTED_TABLE)) + { + nodeGroupHasDistributedTableShardPlacements = true; + break; + } + } + + systable_endscan(scanDescriptor); + table_close(pgPlacement, NoLock); + + return nodeGroupHasDistributedTableShardPlacements; +} + + +/* + * GetShardgroupPlacementForPlacement returns ShardgroupPlacement that placement * with given shardId & placementId belongs to. */ -ShardPlacementGroup * -GetShardPlacementGroupForPlacement(uint64 shardId, uint64 placementId) +ShardgroupPlacement * +GetShardgroupPlacementForPlacement(uint64 shardId, uint64 placementId) { ShardPlacement *shardPlacement = LoadShardPlacement(shardId, placementId); ShardInterval *shardInterval = LoadShardInterval(shardId); - ShardPlacementGroup *placementGroup = palloc(sizeof(ShardPlacementGroup)); + ShardgroupPlacement *placementGroup = palloc(sizeof(ShardgroupPlacement)); placementGroup->colocatationId = shardPlacement->colocationGroupId; placementGroup->shardIntervalIndex = shardInterval->shardIndex; placementGroup->nodeGroupId = shardPlacement->groupId; diff --git a/src/backend/distributed/operations/rebalancer_placement_isolation.c b/src/backend/distributed/operations/rebalancer_placement_separation.c similarity index 62% rename from src/backend/distributed/operations/rebalancer_placement_isolation.c rename to src/backend/distributed/operations/rebalancer_placement_separation.c index 6197004cf..86c60afaf 100644 --- a/src/backend/distributed/operations/rebalancer_placement_isolation.c +++ b/src/backend/distributed/operations/rebalancer_placement_separation.c @@ -1,6 +1,6 @@ /*------------------------------------------------------------------------- * - * rebalancer_placement_isolation.c + * rebalancer_placement_separation.c * Routines to determine which worker node should be used to separate * a colocated set of shard placements that need separate nodes. * @@ -10,6 +10,7 @@ */ #include "postgres.h" + #include "nodes/pg_list.h" #include "utils/hsearch.h" #include "utils/lsyscache.h" @@ -20,12 +21,17 @@ #include "distributed/metadata_cache.h" #include "distributed/metadata_utility.h" #include "distributed/multi_physical_planner.h" -#include "distributed/rebalancer_placement_isolation.h" +#include "distributed/rebalancer_placement_separation.h" #include "distributed/shard_rebalancer.h" -struct RebalancerPlacementIsolationContext +struct RebalancerPlacementSeparationContext { + /* + * Hash table where each entry is of the form NodeToPlacementGroupHashEntry, + * meaning that each entry maps the node with nodeGroupId to + * a NodeToPlacementGroupHashEntry. + */ HTAB *nodePlacementGroupHash; }; @@ -35,7 +41,7 @@ struct RebalancerPlacementIsolationContext * placement group that is determined to be separated from other shards in * the cluster via that node. */ -typedef struct +typedef struct NodeToPlacementGroupHashEntry { /* hash key -- group id of the node */ int32 nodeGroupId; @@ -49,39 +55,37 @@ typedef struct bool shouldHaveShards; /* - * Whether given node is allowed to separate any shard placement groups. + * Whether given node is allowed to separate any shardgroup placements. */ bool allowedToSeparateAnyPlacementGroup; /* - * Shard placement group that is assigned to this node to be separated + * Shardgroup placement that is assigned to this node to be separated * from others in the cluster. * - * NULL if no shard placement group is assigned yet. + * NULL if no shardgroup placement is assigned yet. */ - ShardPlacementGroup *assignedPlacementGroup; + ShardgroupPlacement *assignedPlacementGroup; } NodeToPlacementGroupHashEntry; /* - * Routines to prepare a hash table where each entry is of type - * NodeToPlacementGroupHashEntry. + * Routines to prepare RebalancerPlacementSeparationContext. */ -static void NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, - List *activeWorkerNodeList, - List *rebalancePlacementList, - WorkerNode *drainWorkerNode); -static void NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, - List *activeWorkerNodeList, - List *rebalancePlacementList, - FmgrInfo *shardAllowedOnNodeUDF); -static bool NodeToPlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash, - int32 nodeGroupId, - ShardPlacement *shardPlacement, - FmgrInfo *shardAllowedOnNodeUDF); -static NodeToPlacementGroupHashEntry * NodeToPlacementGroupHashGetNodeWithGroupId( - HTAB *nodePlacementGroupHash, - int32 - nodeGroupId); +static void InitRebalancerPlacementSeparationContext( + RebalancerPlacementSeparationContext *context, + List *activeWorkerNodeList, + List *rebalancePlacementList, + WorkerNode *drainWorkerNode); +static void TryAssignPlacementGroupsToNodeGroups( + RebalancerPlacementSeparationContext *context, + List *activeWorkerNodeList, + List *rebalancePlacementList, + FmgrInfo *shardAllowedOnNodeUDF); +static bool TryAssignPlacementGroupToNodeGroup( + RebalancerPlacementSeparationContext *context, + int32 candidateNodeGroupId, + ShardPlacement *shardPlacement, + FmgrInfo *shardAllowedOnNodeUDF); /* other helpers */ @@ -90,49 +94,54 @@ static int WorkerNodeListGetNodeWithGroupId(List *workerNodeList, int32 nodeGrou /* - * PrepareRebalancerPlacementIsolationContext creates RebalancerPlacementIsolationContext - * that keeps track of which worker nodes are used to separate which shard placement groups + * PrepareRebalancerPlacementSeparationContext creates RebalancerPlacementSeparationContext + * that keeps track of which worker nodes are used to separate which shardgroup placements * that need separate nodes. */ -RebalancerPlacementIsolationContext * -PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList, - List *rebalancePlacementList, - WorkerNode *drainWorkerNode, - FmgrInfo *shardAllowedOnNodeUDF) +RebalancerPlacementSeparationContext * +PrepareRebalancerPlacementSeparationContext(List *activeWorkerNodeList, + List *rebalancePlacementList, + WorkerNode *drainWorkerNode, + FmgrInfo *shardAllowedOnNodeUDF) { HTAB *nodePlacementGroupHash = CreateSimpleHashWithNameAndSize(uint32, NodeToPlacementGroupHashEntry, "NodeToPlacementGroupHash", list_length(activeWorkerNodeList)); + RebalancerPlacementSeparationContext *context = + palloc(sizeof(RebalancerPlacementSeparationContext)); + context->nodePlacementGroupHash = nodePlacementGroupHash; + activeWorkerNodeList = SortList(activeWorkerNodeList, CompareWorkerNodes); rebalancePlacementList = SortList(rebalancePlacementList, CompareShardPlacements); - NodeToPlacementGroupHashInit(nodePlacementGroupHash, activeWorkerNodeList, - rebalancePlacementList, drainWorkerNode); + InitRebalancerPlacementSeparationContext(context, activeWorkerNodeList, + rebalancePlacementList, drainWorkerNode); - NodeToPlacementGroupHashAssignNodes(nodePlacementGroupHash, - activeWorkerNodeList, - rebalancePlacementList, - shardAllowedOnNodeUDF); - - RebalancerPlacementIsolationContext *context = - palloc(sizeof(RebalancerPlacementIsolationContext)); - context->nodePlacementGroupHash = nodePlacementGroupHash; + TryAssignPlacementGroupsToNodeGroups(context, + activeWorkerNodeList, + rebalancePlacementList, + shardAllowedOnNodeUDF); return context; } /* - * NodeToPlacementGroupHashInit initializes given hash table where each - * entry is of type NodeToPlacementGroupHashEntry by using given list - * of worker nodes and the worker node that is being drained, if specified. + * InitRebalancerPlacementSeparationContext initializes given + * RebalancerPlacementSeparationContext by using given list + * of worker nodes and the worker node that is being drained, + * if specified. */ static void -NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *activeWorkerNodeList, - List *rebalancePlacementList, WorkerNode *drainWorkerNode) +InitRebalancerPlacementSeparationContext(RebalancerPlacementSeparationContext *context, + List *activeWorkerNodeList, + List *rebalancePlacementList, + WorkerNode *drainWorkerNode) { + HTAB *nodePlacementGroupHash = context->nodePlacementGroupHash; + List *placementListUniqueNodeGroupIds = PlacementListGetUniqueNodeGroupIds(rebalancePlacementList); @@ -143,8 +152,6 @@ NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *activeWorkerNod hash_search(nodePlacementGroupHash, &workerNode->groupId, HASH_ENTER, NULL); - nodePlacementGroupHashEntry->nodeGroupId = workerNode->groupId; - bool shouldHaveShards = workerNode->shouldHaveShards; if (drainWorkerNode && drainWorkerNode->groupId == workerNode->groupId) { @@ -183,59 +190,49 @@ NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *activeWorkerNod if (!shouldHaveShards) { - /* we can't assing any shard placement groups to the node anyway */ + /* we can't assing any shardgroup placements to the node anyway */ continue; } - if (list_length(placementListUniqueNodeGroupIds) == list_length( - activeWorkerNodeList)) - { - /* - * list_member_oid() check would return true for all placements then. - * This means that all the nodes are of type D. - */ - Assert(list_member_oid(placementListUniqueNodeGroupIds, workerNode->groupId)); - continue; - } - - if (list_member_oid(placementListUniqueNodeGroupIds, workerNode->groupId)) + if (list_member_int(placementListUniqueNodeGroupIds, workerNode->groupId)) { /* node is of type D */ continue; } - ShardPlacementGroup *separatedShardPlacementGroup = - NodeGroupGetSeparatedShardPlacementGroup( + ShardgroupPlacement *separatedShardgroupPlacement = + NodeGroupGetSeparatedShardgroupPlacement( nodePlacementGroupHashEntry->nodeGroupId); - if (separatedShardPlacementGroup) + if (separatedShardgroupPlacement) { nodePlacementGroupHashEntry->assignedPlacementGroup = - separatedShardPlacementGroup; + separatedShardgroupPlacement; } else { nodePlacementGroupHashEntry->allowedToSeparateAnyPlacementGroup = - !NodeGroupHasShardPlacements(nodePlacementGroupHashEntry->nodeGroupId); + !NodeGroupHasDistributedTableShardPlacements( + nodePlacementGroupHashEntry->nodeGroupId); } } } /* - * NodeToPlacementGroupHashAssignNodes assigns all active shard placements in - * the cluster that need separate nodes to individual worker nodes. + * TryAssignPlacementGroupsToNodeGroups tries to assign placements that need + * separate nodes within given placement list to individual worker nodes. */ static void -NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, - List *activeWorkerNodeList, - List *rebalancePlacementList, - FmgrInfo *shardAllowedOnNodeUDF) +TryAssignPlacementGroupsToNodeGroups(RebalancerPlacementSeparationContext *context, + List *activeWorkerNodeList, + List *rebalancePlacementList, + FmgrInfo *shardAllowedOnNodeUDF) { List *availableWorkerList = list_copy(activeWorkerNodeList); List *unassignedPlacementList = NIL; /* - * Assign as much as possible shard placement groups to worker nodes where + * Assign as much as possible shardgroup placements to worker nodes where * they are stored already. */ ShardPlacement *shardPlacement = NULL; @@ -247,20 +244,20 @@ NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, continue; } - int32 shardPlacementGroupId = shardPlacement->groupId; - if (NodeToPlacementGroupHashAssignNode(nodePlacementGroupHash, - shardPlacementGroupId, + int32 currentNodeGroupId = shardPlacement->groupId; + if (TryAssignPlacementGroupToNodeGroup(context, + currentNodeGroupId, shardPlacement, shardAllowedOnNodeUDF)) { /* - * NodeToPlacementGroupHashAssignNode() succeeds for each worker node + * TryAssignPlacementGroupToNodeGroup() succeeds for each worker node * once, hence we must not have removed the worker node from the list * yet, and WorkerNodeListGetNodeWithGroupId() ensures that already. */ int currentPlacementNodeIdx = WorkerNodeListGetNodeWithGroupId(availableWorkerList, - shardPlacementGroupId); + currentNodeGroupId); availableWorkerList = list_delete_nth_cell(availableWorkerList, currentPlacementNodeIdx); } @@ -274,7 +271,7 @@ NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, bool emitWarning = false; /* - * For the shard placement groups that could not be assigned to their + * For the shardgroup placements that could not be assigned to their * current node, assign them to any other node that is available. */ ShardPlacement *unassignedShardPlacement = NULL; @@ -285,7 +282,7 @@ NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, WorkerNode *availableWorkerNode = NULL; foreach_ptr(availableWorkerNode, availableWorkerList) { - if (NodeToPlacementGroupHashAssignNode(nodePlacementGroupHash, + if (TryAssignPlacementGroupToNodeGroup(context, availableWorkerNode->groupId, unassignedShardPlacement, shardAllowedOnNodeUDF)) @@ -310,24 +307,32 @@ NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, /* - * NodeToPlacementGroupHashAssignNode is an helper to - * NodeToPlacementGroupHashAssignNodes that tries to assign given + * TryAssignPlacementGroupToNodeGroup is an helper to + * TryAssignPlacementGroupsToNodeGroups that tries to assign given * shard placement to given node and returns true if it succeeds. */ static bool -NodeToPlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash, - int32 nodeGroupId, +TryAssignPlacementGroupToNodeGroup(RebalancerPlacementSeparationContext *context, + int32 candidateNodeGroupId, ShardPlacement *shardPlacement, FmgrInfo *shardAllowedOnNodeUDF) { + HTAB *nodePlacementGroupHash = context->nodePlacementGroupHash; + + bool found = false; NodeToPlacementGroupHashEntry *nodePlacementGroupHashEntry = - NodeToPlacementGroupHashGetNodeWithGroupId(nodePlacementGroupHash, nodeGroupId); + hash_search(nodePlacementGroupHash, &candidateNodeGroupId, HASH_FIND, &found); + + if (!found) + { + ereport(ERROR, (errmsg("no such node is found"))); + } if (nodePlacementGroupHashEntry->assignedPlacementGroup) { /* * Right now callers of this function call it once for each distinct - * shard placement group, hence we assume that shard placement group + * shardgroup placement, hence we assume that shardgroup placement * that given shard placement belongs to and * nodePlacementGroupHashEntry->assignedPlacementGroup cannot be the * same, without checking. @@ -345,7 +350,7 @@ NodeToPlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash, return false; } - WorkerNode *workerNode = PrimaryNodeForGroup(nodeGroupId, NULL); + WorkerNode *workerNode = PrimaryNodeForGroup(candidateNodeGroupId, NULL); Datum allowed = FunctionCall2(shardAllowedOnNodeUDF, shardPlacement->shardId, workerNode->nodeId); if (!DatumGetBool(allowed)) @@ -354,7 +359,7 @@ NodeToPlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash, } nodePlacementGroupHashEntry->assignedPlacementGroup = - GetShardPlacementGroupForPlacement(shardPlacement->shardId, + GetShardgroupPlacementForPlacement(shardPlacement->shardId, shardPlacement->placementId); return true; @@ -362,28 +367,34 @@ NodeToPlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash, /* - * RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker returns true + * RebalancerPlacementSeparationContextPlacementIsAllowedOnWorker returns true * if shard placement with given shardId & placementId is allowed to be stored * on given worker node. */ bool -RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker( - RebalancerPlacementIsolationContext *context, +RebalancerPlacementSeparationContextPlacementIsAllowedOnWorker( + RebalancerPlacementSeparationContext *context, uint64 shardId, uint64 placementId, WorkerNode *workerNode) { HTAB *nodePlacementGroupHash = context->nodePlacementGroupHash; + + bool found = false; NodeToPlacementGroupHashEntry *nodePlacementGroupHashEntry = - NodeToPlacementGroupHashGetNodeWithGroupId(nodePlacementGroupHash, - workerNode->groupId); + hash_search(nodePlacementGroupHash, &(workerNode->groupId), HASH_FIND, &found); + + if (!found) + { + ereport(ERROR, (errmsg("no such node is found"))); + } ShardInterval *shardInterval = LoadShardInterval(shardId); if (!shardInterval->needsSeparateNode) { /* * It doesn't need a separate node, but is the node used to separate - * a shard placement group? If so, we cannot store it on this node. + * a shardgroup placement? If so, we cannot store it on this node. */ return nodePlacementGroupHashEntry->shouldHaveShards && nodePlacementGroupHashEntry->assignedPlacementGroup == NULL; @@ -399,35 +410,13 @@ RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker( return false; } - ShardPlacementGroup *placementGroup = - GetShardPlacementGroupForPlacement(shardId, placementId); - return ShardPlacementGroupsSame(nodePlacementGroupHashEntry->assignedPlacementGroup, + ShardgroupPlacement *placementGroup = + GetShardgroupPlacementForPlacement(shardId, placementId); + return ShardgroupPlacementsSame(nodePlacementGroupHashEntry->assignedPlacementGroup, placementGroup); } -/* - * NodeToPlacementGroupHashGetNodeWithGroupId searches given hash table for - * NodeToPlacementGroupHashEntry with given node id and returns it. - * - * Throws an error if no such entry is found. - */ -static NodeToPlacementGroupHashEntry * -NodeToPlacementGroupHashGetNodeWithGroupId(HTAB *nodePlacementGroupHash, - int32 nodeGroupId) -{ - NodeToPlacementGroupHashEntry *nodePlacementGroupHashEntry = - hash_search(nodePlacementGroupHash, &nodeGroupId, HASH_FIND, NULL); - - if (nodePlacementGroupHashEntry == NULL) - { - ereport(ERROR, (errmsg("no such node is found"))); - } - - return nodePlacementGroupHashEntry; -} - - /* * PlacementListGetUniqueNodeGroupIds returns a list of unique node group ids * that are used by given list of shard placements. @@ -441,7 +430,7 @@ PlacementListGetUniqueNodeGroupIds(List *placementList) foreach_ptr(shardPlacement, placementList) { placementListUniqueNodeGroupIds = - list_append_unique_oid(placementListUniqueNodeGroupIds, + list_append_unique_int(placementListUniqueNodeGroupIds, shardPlacement->groupId); } diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 391c34bad..5ba6bd23e 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -44,7 +44,7 @@ #include "distributed/pg_dist_rebalance_strategy.h" #include "distributed/reference_table_utils.h" #include "distributed/remote_commands.h" -#include "distributed/rebalancer_placement_isolation.h" +#include "distributed/rebalancer_placement_separation.h" #include "distributed/resource_lock.h" #include "distributed/shard_rebalancer.h" #include "distributed/shard_cleaner.h" @@ -147,7 +147,7 @@ typedef struct RebalanceContext FmgrInfo nodeCapacityUDF; FmgrInfo shardAllowedOnNodeUDF; - RebalancerPlacementIsolationContext *rebalancerPlacementGroupIsolationContext; + RebalancerPlacementSeparationContext *shardgroupPlacementSeparationContext; } RebalanceContext; /* WorkerHashKey contains hostname and port to be used as a key in a hash */ @@ -594,8 +594,8 @@ GetRebalanceSteps(RebalanceOptions *options) options->threshold = options->rebalanceStrategy->minimumThreshold; } - context.rebalancerPlacementGroupIsolationContext = - PrepareRebalancerPlacementIsolationContext( + context.shardgroupPlacementSeparationContext = + PrepareRebalancerPlacementSeparationContext( activeWorkerList, FlattenNestedList(activeShardPlacementListList), options->workerNode, @@ -625,8 +625,8 @@ ShardAllowedOnNode(uint64 shardId, uint64 placementId, WorkerNode *workerNode, RebalanceContext *context = voidContext; - if (!RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker( - context->rebalancerPlacementGroupIsolationContext, + if (!RebalancerPlacementSeparationContextPlacementIsAllowedOnWorker( + context->shardgroupPlacementSeparationContext, shardId, placementId, workerNode)) { return false; @@ -3190,7 +3190,7 @@ ReplicationPlacementUpdates(List *workerNodeList, List *activeShardPlacementList { WorkerNode *workerNode = list_nth(workerNodeList, workerNodeIndex); - if (!NodeCanBeUsedForNonIsolatedPlacements(workerNode)) + if (!NodeCanBeUsedForNonSeparatedPlacements(workerNode)) { /* never replicate placements to nodes that should not have placements */ continue; diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index 8ecdb4864..0bb70627a 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -273,22 +273,22 @@ ErrorIfCoordinatorNotAddedAsWorkerNode() /* * NewDistributedTablePlacementNodeList returns a list of all active, primary * worker nodes that can store new data, i.e shouldstoreshards is 'true' - * and that is not used to isolate a shard placement group. + * and that is not used to isolate a shardgroup placement. */ List * NewDistributedTablePlacementNodeList(LOCKMODE lockMode) { EnsureModificationsCanRun(); - return FilterActiveNodeListFunc(lockMode, NodeCanBeUsedForNonIsolatedPlacements); + return FilterActiveNodeListFunc(lockMode, NodeCanBeUsedForNonSeparatedPlacements); } /* - * NodeCanBeUsedForNonIsolatedPlacements returns true if given node can be + * NodeCanBeUsedForNonSeparatedPlacements returns true if given node can be * used to store shard placements that don't need separate nodes. */ bool -NodeCanBeUsedForNonIsolatedPlacements(WorkerNode *node) +NodeCanBeUsedForNonSeparatedPlacements(WorkerNode *node) { if (!NodeIsPrimary(node)) { @@ -300,7 +300,7 @@ NodeCanBeUsedForNonIsolatedPlacements(WorkerNode *node) return false; } - return NodeGroupGetSeparatedShardPlacementGroup(node->groupId) == NULL; + return NodeGroupGetSeparatedShardgroupPlacement(node->groupId) == NULL; } diff --git a/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql b/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql index 305b7f01d..6883e5622 100644 --- a/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql +++ b/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql @@ -6,7 +6,9 @@ ALTER TABLE pg_dist_shard ADD COLUMN needsseparatenode boolean NOT NULL DEFAULT false; +DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text); #include "udfs/citus_internal_add_shard_metadata/12.2-1.sql" + #include "udfs/citus_internal_shard_group_set_needsseparatenode/12.2-1.sql" #include "udfs/citus_shard_property_set/12.2-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/12.2-1.sql b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/12.2-1.sql index 78f3e8f54..2800b2c3f 100644 --- a/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/12.2-1.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/12.2-1.sql @@ -2,22 +2,11 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata( relation_id regclass, shard_id bigint, storage_type "char", shard_min_value text, - shard_max_value text, needs_separate_node boolean + shard_max_value text, + needs_separate_node boolean default false ) RETURNS void LANGUAGE C AS 'MODULE_PATHNAME', $$citus_internal_add_shard_metadata$$; COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text, boolean) IS 'Inserts into pg_dist_shard with user checks'; - --- replace the old one so it would call the old C function without needs_separate_node -CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata( - relation_id regclass, shard_id bigint, - storage_type "char", shard_min_value text, - shard_max_value text - ) - RETURNS void - LANGUAGE C - AS 'MODULE_PATHNAME', $$citus_internal_add_shard_metadata_legacy$$; -COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text) IS - 'Inserts into pg_dist_shard with user checks'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql index 78f3e8f54..2800b2c3f 100644 --- a/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql @@ -2,22 +2,11 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata( relation_id regclass, shard_id bigint, storage_type "char", shard_min_value text, - shard_max_value text, needs_separate_node boolean + shard_max_value text, + needs_separate_node boolean default false ) RETURNS void LANGUAGE C AS 'MODULE_PATHNAME', $$citus_internal_add_shard_metadata$$; COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text, boolean) IS 'Inserts into pg_dist_shard with user checks'; - --- replace the old one so it would call the old C function without needs_separate_node -CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata( - relation_id regclass, shard_id bigint, - storage_type "char", shard_min_value text, - shard_max_value text - ) - RETURNS void - LANGUAGE C - AS 'MODULE_PATHNAME', $$citus_internal_add_shard_metadata_legacy$$; -COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text) IS - 'Inserts into pg_dist_shard with user checks'; diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 17bd8a4a8..eeb0b34b4 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -109,7 +109,7 @@ typedef struct uint32 colocatationId; int shardIntervalIndex; int32 nodeGroupId; -} ShardPlacementGroup; +} ShardgroupPlacement; typedef enum CascadeToColocatedOption @@ -331,11 +331,12 @@ extern int ShardIntervalCount(Oid relationId); extern List * LoadShardList(Oid relationId); extern ShardInterval * CopyShardInterval(ShardInterval *srcInterval); extern uint64 ShardLength(uint64 shardId); -extern ShardPlacementGroup * NodeGroupGetSeparatedShardPlacementGroup(int32 groupId); -extern bool ShardPlacementGroupsSame(const ShardPlacementGroup *leftGroup, - const ShardPlacementGroup *rightGroup); +extern ShardgroupPlacement * NodeGroupGetSeparatedShardgroupPlacement(int32 groupId); +extern bool ShardgroupPlacementsSame(const ShardgroupPlacement *leftGroup, + const ShardgroupPlacement *rightGroup); extern bool NodeGroupHasShardPlacements(int32 groupId); -extern ShardPlacementGroup * GetShardPlacementGroupForPlacement(uint64 shardId, +extern bool NodeGroupHasDistributedTableShardPlacements(int32 groupId); +extern ShardgroupPlacement * GetShardgroupPlacementForPlacement(uint64 shardId, uint64 placementId); extern bool IsActiveShardPlacement(ShardPlacement *ShardPlacement); extern bool IsRemoteShardPlacement(ShardPlacement *shardPlacement); diff --git a/src/include/distributed/rebalancer_placement_isolation.h b/src/include/distributed/rebalancer_placement_separation.h similarity index 66% rename from src/include/distributed/rebalancer_placement_isolation.h rename to src/include/distributed/rebalancer_placement_separation.h index 7d4c4253e..b00b3f87c 100644 --- a/src/include/distributed/rebalancer_placement_isolation.h +++ b/src/include/distributed/rebalancer_placement_separation.h @@ -1,6 +1,6 @@ /*------------------------------------------------------------------------- * - * rebalancer_placement_isolation.h + * rebalancer_placement_separation.h * Routines to determine which worker node should be used to separate * a colocated set of shard placements that need separate nodes. * @@ -18,10 +18,10 @@ #include "distributed/metadata_utility.h" -struct RebalancerPlacementIsolationContext; -typedef struct RebalancerPlacementIsolationContext RebalancerPlacementIsolationContext; +struct RebalancerPlacementSeparationContext; +typedef struct RebalancerPlacementSeparationContext RebalancerPlacementSeparationContext; -extern RebalancerPlacementIsolationContext * PrepareRebalancerPlacementIsolationContext( +extern RebalancerPlacementSeparationContext * PrepareRebalancerPlacementSeparationContext( List *activeWorkerNodeList, List * @@ -32,8 +32,8 @@ extern RebalancerPlacementIsolationContext * PrepareRebalancerPlacementIsolation FmgrInfo * shardAllowedOnNodeUDF); -extern bool RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker( - RebalancerPlacementIsolationContext *context, +extern bool RebalancerPlacementSeparationContextPlacementIsAllowedOnWorker( + RebalancerPlacementSeparationContext *context, uint64 shardId, uint64 placementId, diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index d35b8e0a2..92af8dc0f 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -78,7 +78,7 @@ extern List * ReferenceTablePlacementNodeList(LOCKMODE lockMode); extern WorkerNode * CoordinatorNodeIfAddedAsWorkerOrError(void); extern void ErrorIfCoordinatorNotAddedAsWorkerNode(void); extern List * NewDistributedTablePlacementNodeList(LOCKMODE lockMode); -extern bool NodeCanBeUsedForNonIsolatedPlacements(WorkerNode *node); +extern bool NodeCanBeUsedForNonSeparatedPlacements(WorkerNode *node); extern List * ActiveReadableNonCoordinatorNodeList(void); extern List * ActiveReadableNodeList(void); extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort); diff --git a/src/test/regress/expected/isolate_placement.out b/src/test/regress/expected/isolate_placement.out index 2a7e75d5d..354fa4fb0 100644 --- a/src/test/regress/expected/isolate_placement.out +++ b/src/test/regress/expected/isolate_placement.out @@ -812,14 +812,14 @@ ORDER BY result; SELECT citus_shard_property_set(NULL, anti_affinity=>true); ERROR: shard_id cannot be NULL SELECT citus_shard_property_set(0, anti_affinity=>true); -ERROR: shard xxxxx does not exist +ERROR: could not find valid entry for shard xxxxx SELECT citus_shard_property_set(NULL, anti_affinity=>false); ERROR: shard_id cannot be NULL SELECT citus_shard_property_set(0, anti_affinity=>false); -ERROR: shard xxxxx does not exist +ERROR: could not find valid entry for shard xxxxx -- we verify whether shard exists even if anti_affinity is not provided SELECT citus_shard_property_set(0, anti_affinity=>NULL); -ERROR: shard xxxxx does not exist +ERROR: could not find valid entry for shard xxxxx CREATE TABLE append_table (a int, b int); SELECT create_distributed_table('append_table', 'a', 'append'); create_distributed_table @@ -858,21 +858,21 @@ SELECT citus_add_local_table_to_metadata('local_table'); -- all should fail SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid = 'append_table'::regclass LIMIT 1; -ERROR: shard isolation is only supported for hash distributed tables +ERROR: setting anti-affinity property is only supported for hash distributed tables SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid = 'range_table'::regclass LIMIT 1; -ERROR: shard isolation is only supported for hash distributed tables +ERROR: setting anti-affinity property is only supported for hash distributed tables SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid = 'ref_table'::regclass LIMIT 1; -ERROR: shard isolation is only supported for hash distributed tables +ERROR: setting anti-affinity property is only supported for hash distributed tables SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid = 'local_table'::regclass LIMIT 1; -ERROR: shard isolation is only supported for hash distributed tables +ERROR: setting anti-affinity property is only supported for hash distributed tables SELECT citus_shard_property_set(shardid, anti_affinity=>false) FROM pg_dist_shard WHERE logicalrelid = 'append_table'::regclass LIMIT 1; -ERROR: shard isolation is only supported for hash distributed tables +ERROR: setting anti-affinity property is only supported for hash distributed tables SELECT citus_shard_property_set(shardid, anti_affinity=>false) FROM pg_dist_shard WHERE logicalrelid = 'range_table'::regclass LIMIT 1; -ERROR: shard isolation is only supported for hash distributed tables +ERROR: setting anti-affinity property is only supported for hash distributed tables SELECT citus_shard_property_set(shardid, anti_affinity=>false) FROM pg_dist_shard WHERE logicalrelid = 'ref_table'::regclass LIMIT 1; -ERROR: shard isolation is only supported for hash distributed tables +ERROR: setting anti-affinity property is only supported for hash distributed tables SELECT citus_shard_property_set(shardid, anti_affinity=>false) FROM pg_dist_shard WHERE logicalrelid = 'local_table'::regclass LIMIT 1; -ERROR: shard isolation is only supported for hash distributed tables +ERROR: setting anti-affinity property is only supported for hash distributed tables DROP TABLE range_table; DROP TYPE composite_key_type; SET client_min_messages TO WARNING; diff --git a/src/test/regress/expected/isolation_create_distributed_table.out b/src/test/regress/expected/isolation_create_distributed_table.out index 714b6403f..9af52828d 100644 --- a/src/test/regress/expected/isolation_create_distributed_table.out +++ b/src/test/regress/expected/isolation_create_distributed_table.out @@ -173,7 +173,7 @@ step s1_set-shard-property: FROM pg_dist_shard WHERE logicalrelid = 'table_to_distribute'::regclass ORDER BY shardid LIMIT 1; -ERROR: could not acquire the lock required to set anti affinity property for a shard of public.table_to_distribute +ERROR: could not acquire the lock required to set a property for a shard of public.table_to_distribute step s1-rollback: ROLLBACK;