mirror of https://github.com/citusdata/citus.git
Persisting TO_DELETE state correctly on split
parent
dd548ee3c7
commit
479ccfae89
|
@ -1266,10 +1266,11 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId,
|
|||
|
||||
/* set shard storage type according to relation type */
|
||||
char shardStorageType = ShardStorageType(citusLocalTableId);
|
||||
ShardState shardState = SHARD_STATE_ACTIVE;
|
||||
|
||||
text *shardMinValue = NULL;
|
||||
text *shardMaxValue = NULL;
|
||||
InsertShardRow(citusLocalTableId, shardId, shardStorageType,
|
||||
InsertShardRow(citusLocalTableId, shardId, shardStorageType, shardState,
|
||||
shardMinValue, shardMaxValue);
|
||||
|
||||
List *nodeList = list_make1(CoordinatorNodeIfAddedAsWorkerOrError());
|
||||
|
|
|
@ -1616,6 +1616,7 @@ List *
|
|||
DDLTaskList(Oid relationId, const char *commandString)
|
||||
{
|
||||
List *taskList = NIL;
|
||||
// TODO(niupre): This should be all shards?
|
||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||
Oid schemaId = get_rel_namespace(relationId);
|
||||
char *schemaName = get_namespace_name(schemaId);
|
||||
|
|
|
@ -357,7 +357,8 @@ QueryTupleShardSearchInfo(ArrayType *minValuesArray, ArrayType *maxValuesArray,
|
|||
[Anum_pg_dist_shard_shardid - 1] = partitionIndex,
|
||||
[Anum_pg_dist_shard_shardstorage - 1] = SHARD_STORAGE_VIRTUAL,
|
||||
[Anum_pg_dist_shard_shardminvalue - 1] = minValues[partitionIndex],
|
||||
[Anum_pg_dist_shard_shardmaxvalue - 1] = maxValues[partitionIndex]
|
||||
[Anum_pg_dist_shard_shardmaxvalue - 1] = maxValues[partitionIndex],
|
||||
[Anum_pg_dist_shard_shardstate- 1] = SHARD_STATE_INVALID_FIRST
|
||||
};
|
||||
bool nullsArray[Natts_pg_dist_shard] = {
|
||||
[Anum_pg_dist_shard_shardminvalue - 1] = minValueNulls[partitionIndex],
|
||||
|
|
|
@ -1778,7 +1778,9 @@ ErrorIfInconsistentShardIntervals(CitusTableCacheEntry *cacheEntry)
|
|||
{
|
||||
/*
|
||||
* If table is hash-partitioned and has shards, there never should be any
|
||||
* uninitalized shards. Historically we've not prevented that for range
|
||||
* uninitalized shards. An exception to this is shard marked as TO_DELETED
|
||||
* after split that are pending deletion.
|
||||
* Historically we've not prevented that for range
|
||||
* partitioned tables, but it might be a good idea to start doing so.
|
||||
*/
|
||||
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
|
||||
|
@ -1900,7 +1902,11 @@ HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
|
|||
curShardInterval->minValue);
|
||||
comparisonResult = DatumGetInt32(comparisonDatum);
|
||||
|
||||
if (comparisonResult >= 0)
|
||||
// If one of the shards are marked as TO_DELETED, ignore the overlap.
|
||||
bool markedForDelete = (lastShardInterval->shardState == SHARD_STATE_TO_DELETE ||
|
||||
curShardInterval->shardState == SHARD_STATE_TO_DELETE);
|
||||
|
||||
if (!markedForDelete && comparisonResult >= 0)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
@ -4815,6 +4821,8 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
|
|||
maxValueExists = true;
|
||||
}
|
||||
|
||||
char shardState = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstate - 1]);
|
||||
|
||||
ShardInterval *shardInterval = CitusMakeNode(ShardInterval);
|
||||
shardInterval->relationId = relationId;
|
||||
shardInterval->storageType = storageType;
|
||||
|
@ -4826,6 +4834,7 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
|
|||
shardInterval->minValue = minValue;
|
||||
shardInterval->maxValue = maxValue;
|
||||
shardInterval->shardId = shardId;
|
||||
shardInterval->shardState = shardState;
|
||||
|
||||
return shardInterval;
|
||||
}
|
||||
|
|
|
@ -132,9 +132,9 @@ static void EnsurePartitionMetadataIsSane(Oid relationId, char distributionMetho
|
|||
int colocationId, char replicationModel,
|
||||
Var *distributionKey);
|
||||
static void EnsureCoordinatorInitiatedOperation(void);
|
||||
static void EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType,
|
||||
text *shardMinValue,
|
||||
text *shardMaxValue);
|
||||
static void EnsureShardMetadataIsSane(Oid relationId, int64 shardId,
|
||||
char storageType, char shardState,
|
||||
text *shardMinValue, text *shardMaxValue);
|
||||
static void EnsureShardPlacementMetadataIsSane(Oid relationId, int64 shardId,
|
||||
int64 placementId, int32 shardState,
|
||||
int64 shardLength, int32 groupId);
|
||||
|
@ -162,6 +162,7 @@ PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata);
|
|||
PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_update_shard_and_placement_state_metadata);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_update_relation_colocation);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_add_object_metadata);
|
||||
|
@ -1295,7 +1296,7 @@ ShardListInsertCommand(List *shardIntervalList)
|
|||
/* now add shards to insertShardCommand */
|
||||
StringInfo insertShardCommand = makeStringInfo();
|
||||
appendStringInfo(insertShardCommand,
|
||||
"WITH shard_data(relationname, shardid, storagetype, "
|
||||
"WITH shard_data(relationname, shardid, storagetype, shardstate, "
|
||||
"shardminvalue, shardmaxvalue) AS (VALUES ");
|
||||
|
||||
foreach_ptr(shardInterval, shardIntervalList)
|
||||
|
@ -1328,10 +1329,11 @@ ShardListInsertCommand(List *shardIntervalList)
|
|||
}
|
||||
|
||||
appendStringInfo(insertShardCommand,
|
||||
"(%s::regclass, %ld, '%c'::\"char\", %s, %s)",
|
||||
"(%s::regclass, %ld, '%c'::\"char\", %d, %s, %s)",
|
||||
quote_literal_cstr(qualifiedRelationName),
|
||||
shardId,
|
||||
shardInterval->storageType,
|
||||
shardInterval->shardState,
|
||||
minHashToken->data,
|
||||
maxHashToken->data);
|
||||
|
||||
|
@ -1345,7 +1347,7 @@ ShardListInsertCommand(List *shardIntervalList)
|
|||
|
||||
appendStringInfo(insertShardCommand,
|
||||
"SELECT citus_internal_add_shard_metadata(relationname, shardid, "
|
||||
"storagetype, shardminvalue, shardmaxvalue) "
|
||||
"storagetype, shardstate, shardminvalue, shardmaxvalue) "
|
||||
"FROM shard_data;");
|
||||
|
||||
/*
|
||||
|
@ -3196,16 +3198,19 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
|
|||
PG_ENSURE_ARGNOTNULL(2, "storage type");
|
||||
char storageType = PG_GETARG_CHAR(2);
|
||||
|
||||
PG_ENSURE_ARGNOTNULL(2, "shardstate");
|
||||
ShardState shardState = PG_GETARG_INT32(3);
|
||||
|
||||
text *shardMinValue = NULL;
|
||||
if (!PG_ARGISNULL(3))
|
||||
{
|
||||
shardMinValue = PG_GETARG_TEXT_P(3);
|
||||
shardMinValue = PG_GETARG_TEXT_P(4);
|
||||
}
|
||||
|
||||
text *shardMaxValue = NULL;
|
||||
if (!PG_ARGISNULL(4))
|
||||
{
|
||||
shardMaxValue = PG_GETARG_TEXT_P(4);
|
||||
shardMaxValue = PG_GETARG_TEXT_P(5);
|
||||
}
|
||||
|
||||
/* only owner of the table (or superuser) is allowed to add the Citus metadata */
|
||||
|
@ -3224,11 +3229,11 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
|
|||
* not sane, the user can only affect its own tables. Given that the
|
||||
* user is owner of the table, we should allow.
|
||||
*/
|
||||
EnsureShardMetadataIsSane(relationId, shardId, storageType, shardMinValue,
|
||||
EnsureShardMetadataIsSane(relationId, shardId, storageType, shardState, shardMinValue,
|
||||
shardMaxValue);
|
||||
}
|
||||
|
||||
InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue);
|
||||
InsertShardRow(relationId, shardId, storageType, shardState, shardMinValue, shardMaxValue);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
@ -3262,7 +3267,8 @@ EnsureCoordinatorInitiatedOperation(void)
|
|||
* for inserting into pg_dist_shard metadata.
|
||||
*/
|
||||
static void
|
||||
EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType,
|
||||
EnsureShardMetadataIsSane(Oid relationId, int64 shardId,
|
||||
char storageType, char shardState,
|
||||
text *shardMinValue, text *shardMaxValue)
|
||||
{
|
||||
if (shardId <= INVALID_SHARD_ID)
|
||||
|
@ -3278,6 +3284,13 @@ EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType,
|
|||
errmsg("Invalid shard storage type: %c", storageType)));
|
||||
}
|
||||
|
||||
if (!(shardState == SHARD_STATE_ACTIVE ||
|
||||
shardState == SHARD_STATE_TO_DELETE))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("Invalid shard state type: %c", shardState)));
|
||||
}
|
||||
|
||||
char partitionMethod = PartitionMethodViaCatalog(relationId);
|
||||
if (partitionMethod == DISTRIBUTE_BY_INVALID)
|
||||
{
|
||||
|
@ -3362,8 +3375,10 @@ EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType,
|
|||
|
||||
Datum firstMin = Int32GetDatum(shardMinValueInt);
|
||||
Datum firstMax = Int32GetDatum(shardMaxValueInt);
|
||||
ShardState firstState = shardState;
|
||||
Datum secondMin = shardInterval->minValue;
|
||||
Datum secondMax = shardInterval->maxValue;
|
||||
ShardState secondState = shardInterval->shardState;
|
||||
Oid collationId = InvalidOid;
|
||||
|
||||
/*
|
||||
|
@ -3380,6 +3395,7 @@ EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType,
|
|||
}
|
||||
|
||||
if (ShardIntervalsOverlapWithParams(firstMin, firstMax, secondMin, secondMax,
|
||||
firstState, secondState,
|
||||
shardIntervalCompareFunction,
|
||||
collationId))
|
||||
{
|
||||
|
@ -3574,6 +3590,83 @@ citus_internal_update_placement_metadata(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* citus_internal_update_shard_and_placement_state_metadata is an internal UDF to
|
||||
* update shardState value for Shard and Placement in pg_dist_shard and pg_dist_placement.
|
||||
*/
|
||||
Datum
|
||||
citus_internal_update_shard_and_placement_state_metadata(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
int64 shardId = PG_GETARG_INT64(0);
|
||||
ShardState newState = PG_GETARG_INT32(1);
|
||||
|
||||
ShardPlacement *placement = NULL;
|
||||
if (!ShouldSkipMetadataChecks())
|
||||
{
|
||||
/* this UDF is not allowed allowed for executing as a separate command */
|
||||
EnsureCoordinatorInitiatedOperation();
|
||||
|
||||
if (!ShardExists(shardId))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("Shard id does not exists: %ld", shardId)));
|
||||
}
|
||||
|
||||
bool missingOk = false;
|
||||
EnsureShardOwner(shardId, missingOk);
|
||||
|
||||
/*
|
||||
* This function ensures that the source group exists hence we
|
||||
* call it from this code-block.
|
||||
*/
|
||||
List *shardPlacementList = ActiveShardPlacementList(shardId);
|
||||
|
||||
/* Split is only allowed for shards with a single placement */
|
||||
Assert(list_length(shardPlacementList) == 1);
|
||||
placement = linitial(shardPlacementList);
|
||||
|
||||
WorkerNode *workerNode = FindNodeWithNodeId(placement->nodeId,
|
||||
false /* missingOk */);
|
||||
if (!workerNode)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("Node with group id %d for shard placement "
|
||||
"%ld does not exist", workerNode->groupId, shardId)));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* This function ensures that the source group exists hence we
|
||||
* call it from this code-block.
|
||||
*/
|
||||
List *shardPlacementList = ActiveShardPlacementList(shardId);
|
||||
|
||||
/* Split is only allowed for shards with a single placement */
|
||||
Assert(list_length(shardPlacementList) == 1);
|
||||
placement = linitial(shardPlacementList);
|
||||
}
|
||||
|
||||
/*
|
||||
* Updating pg_dist_placement ensures that the node with targetGroupId
|
||||
* exists and this is the only placement on that group.
|
||||
*/
|
||||
if (placement == NULL)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("Active placement for shard %ld is not "
|
||||
"found", shardId)));
|
||||
}
|
||||
|
||||
UpdateShardState(shardId, newState);
|
||||
UpdateShardPlacementState(placement->placementId, newState);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* citus_internal_delete_shard_metadata is an internal UDF to
|
||||
* delete a row in pg_dist_shard and corresponding placement rows
|
||||
|
|
|
@ -1198,6 +1198,7 @@ CopyShardInterval(ShardInterval *srcInterval)
|
|||
destInterval->type = srcInterval->type;
|
||||
destInterval->relationId = srcInterval->relationId;
|
||||
destInterval->storageType = srcInterval->storageType;
|
||||
destInterval->shardState = srcInterval->shardState;
|
||||
destInterval->valueTypeId = srcInterval->valueTypeId;
|
||||
destInterval->valueTypeLen = srcInterval->valueTypeLen;
|
||||
destInterval->valueByVal = srcInterval->valueByVal;
|
||||
|
@ -1314,6 +1315,17 @@ IsActiveShardPlacement(ShardPlacement *shardPlacement)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsActiveShardInterval checks if the shard interval is labelled as
|
||||
* active.
|
||||
*/
|
||||
bool
|
||||
IsActiveShardInterval(ShardInterval *shardInterval)
|
||||
{
|
||||
return (shardInterval->shardState == SHARD_STATE_ACTIVE);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FilterShardPlacementList filters a list of shard placements based on a filter.
|
||||
* Keep only the shard for which the filter function returns true.
|
||||
|
@ -1633,7 +1645,7 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
|
|||
*/
|
||||
void
|
||||
InsertShardRow(Oid relationId, uint64 shardId, char storageType,
|
||||
text *shardMinValue, text *shardMaxValue)
|
||||
int shardState, text *shardMinValue, text *shardMaxValue)
|
||||
{
|
||||
Datum values[Natts_pg_dist_shard];
|
||||
bool isNulls[Natts_pg_dist_shard];
|
||||
|
@ -1661,6 +1673,8 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
|
|||
isNulls[Anum_pg_dist_shard_shardmaxvalue - 1] = true;
|
||||
}
|
||||
|
||||
values[Anum_pg_dist_shard_shardstate - 1] = Int32GetDatum(shardState);
|
||||
|
||||
/* open shard relation and insert new tuple */
|
||||
Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock);
|
||||
|
||||
|
@ -1942,6 +1956,56 @@ DeleteShardPlacementRow(uint64 placementId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* UpdateShardState sets the shardState for the shard identified
|
||||
* by shardId.
|
||||
*/
|
||||
void
|
||||
UpdateShardState(uint64 shardId, char shardState)
|
||||
{
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
bool indexOK = true;
|
||||
Datum values[Natts_pg_dist_shard];
|
||||
bool isnull[Natts_pg_dist_shard];
|
||||
bool replace[Natts_pg_dist_shard];
|
||||
bool colIsNull = false;
|
||||
|
||||
Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistShard);
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid,
|
||||
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId));
|
||||
|
||||
SysScanDesc scanDescriptor = systable_beginscan(pgDistShard,
|
||||
DistShardShardidIndexId(), indexOK,
|
||||
NULL, scanKeyCount, scanKey);
|
||||
|
||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||
if (!HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not find valid entry for shard "
|
||||
UINT64_FORMAT, shardId)));
|
||||
}
|
||||
|
||||
memset(replace, 0, sizeof(replace));
|
||||
|
||||
values[Anum_pg_dist_shard_shardstate - 1] = CharGetDatum(shardState);
|
||||
isnull[Anum_pg_dist_shard_shardstate - 1] = false;
|
||||
replace[Anum_pg_dist_shard_shardstate - 1] = true;
|
||||
|
||||
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
|
||||
CatalogTupleUpdate(pgDistShard, &heapTuple->t_self, heapTuple);
|
||||
|
||||
Assert(!colIsNull);
|
||||
CitusInvalidateRelcacheByShardId(shardId);
|
||||
|
||||
CommandCounterIncrement();
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
table_close(pgDistShard, NoLock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UpdateShardPlacementState sets the shardState for the placement identified
|
||||
* by placementId.
|
||||
|
|
|
@ -195,6 +195,9 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
|
|||
/* set shard storage type according to relation type */
|
||||
char shardStorageType = ShardStorageType(distributedTableId);
|
||||
|
||||
/* shard state is active by default */
|
||||
ShardState shardState = SHARD_STATE_ACTIVE;
|
||||
|
||||
for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++)
|
||||
{
|
||||
uint32 roundRobinNodeIndex = shardIndex % workerNodeCount;
|
||||
|
@ -214,7 +217,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
|
|||
text *minHashTokenText = IntegerToText(shardMinHashToken);
|
||||
text *maxHashTokenText = IntegerToText(shardMaxHashToken);
|
||||
|
||||
InsertShardRow(distributedTableId, shardId, shardStorageType,
|
||||
InsertShardRow(distributedTableId, shardId, shardStorageType, shardState,
|
||||
minHashTokenText, maxHashTokenText);
|
||||
|
||||
List *currentInsertedShardPlacements = InsertShardPlacementRows(
|
||||
|
@ -262,6 +265,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
|
|||
LockRelationOid(sourceRelationId, AccessShareLock);
|
||||
|
||||
/* prevent placement changes of the source relation until we colocate with them */
|
||||
// TODO(niupre): We should only return ACTIVE shards, not ALL shards.
|
||||
List *sourceShardIntervalList = LoadShardIntervalList(sourceRelationId);
|
||||
LockShardListMetadata(sourceShardIntervalList, ShareLock);
|
||||
|
||||
|
@ -283,6 +287,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
|
|||
uint64 sourceShardId = sourceShardInterval->shardId;
|
||||
uint64 newShardId = GetNextShardId();
|
||||
|
||||
|
||||
int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue);
|
||||
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
|
||||
text *shardMinValueText = IntegerToText(shardMinValue);
|
||||
|
@ -290,14 +295,14 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
|
|||
List *sourceShardPlacementList = ShardPlacementListWithoutOrphanedPlacements(
|
||||
sourceShardId);
|
||||
|
||||
InsertShardRow(targetRelationId, newShardId, targetShardStorageType,
|
||||
const ShardState shardState = SHARD_STATE_ACTIVE;
|
||||
InsertShardRow(targetRelationId, newShardId, targetShardStorageType, shardState,
|
||||
shardMinValueText, shardMaxValueText);
|
||||
|
||||
ShardPlacement *sourcePlacement = NULL;
|
||||
foreach_ptr(sourcePlacement, sourceShardPlacementList)
|
||||
{
|
||||
int32 groupId = sourcePlacement->groupId;
|
||||
const ShardState shardState = SHARD_STATE_ACTIVE;
|
||||
const uint64 shardSize = 0;
|
||||
|
||||
/*
|
||||
|
@ -370,7 +375,10 @@ CreateReferenceTableShard(Oid distributedTableId)
|
|||
/* get the next shard id */
|
||||
uint64 shardId = GetNextShardId();
|
||||
|
||||
InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue,
|
||||
/* shard state is active by default */
|
||||
ShardState shardState = SHARD_STATE_ACTIVE;
|
||||
|
||||
InsertShardRow(distributedTableId, shardId, shardStorageType, shardState, shardMinValue,
|
||||
shardMaxValue);
|
||||
|
||||
List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId,
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#include "distributed/adaptive_executor.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/metadata_utility.h"
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
#include "distributed/connection_management.h"
|
||||
|
@ -39,6 +40,7 @@
|
|||
#include "distributed/shardsplit_logical_replication.h"
|
||||
#include "distributed/deparse_shard_query.h"
|
||||
#include "distributed/shard_rebalancer.h"
|
||||
#include "distributed/shard_cleaner.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
|
||||
/*
|
||||
|
@ -129,7 +131,7 @@ static void AddDummyShardEntryInMap(HTAB *mapOfDummyShards, uint32 targetNodeId,
|
|||
static void DropDummyShards(HTAB *mapOfDummyShardToPlacement);
|
||||
static void DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval);
|
||||
static uint64 GetNextShardIdForSplitChild(void);
|
||||
|
||||
static void MarkShardListForDrop(List *shardIntervalList);
|
||||
|
||||
/* Customize error message strings based on operation type */
|
||||
static const char *const SplitOperationName[] =
|
||||
|
@ -143,6 +145,8 @@ static const char *const SplitTargetName[] =
|
|||
[ISOLATE_TENANT_TO_NEW_SHARD] = "tenant",
|
||||
};
|
||||
|
||||
bool DeferShardDeleteOnSplit = false;
|
||||
|
||||
/* Function definitions */
|
||||
|
||||
/*
|
||||
|
@ -550,12 +554,23 @@ BlockingShardSplit(SplitOperation splitOperation,
|
|||
* going forward are part of the same distributed transaction.
|
||||
*/
|
||||
|
||||
if (DeferShardDeleteOnSplit)
|
||||
{
|
||||
/*
|
||||
* Defer deletion of source shard and only mark
|
||||
* shard metadata for deletion.
|
||||
*/
|
||||
MarkShardListForDrop(sourceColocatedShardIntervalList);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Drop old shards and delete related metadata. Have to do that before
|
||||
* creating the new shard metadata, because there's cross-checks
|
||||
* preventing inconsistent metadata (like overlapping shards).
|
||||
*/
|
||||
DropShardList(sourceColocatedShardIntervalList);
|
||||
}
|
||||
|
||||
/* Insert new shard and placement metdata */
|
||||
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
|
||||
|
@ -1013,6 +1028,7 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
|
|||
shardInterval->relationId,
|
||||
shardInterval->shardId,
|
||||
shardInterval->storageType,
|
||||
shardInterval->shardState,
|
||||
IntegerToText(DatumGetInt32(shardInterval->minValue)),
|
||||
IntegerToText(DatumGetInt32(shardInterval->maxValue)));
|
||||
|
||||
|
@ -1131,6 +1147,43 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* MarkShardListForDrop drops shards and their metadata from both the coordinator and
|
||||
* mx nodes.
|
||||
*/
|
||||
static void
|
||||
MarkShardListForDrop(List *shardIntervalList)
|
||||
{
|
||||
ShardInterval *shardInterval = NULL;
|
||||
foreach_ptr(shardInterval, shardIntervalList)
|
||||
{
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
ShardState newState = SHARD_STATE_TO_DELETE;
|
||||
|
||||
UpdateShardState(shardId, newState);
|
||||
List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
|
||||
|
||||
/* Only single placement allowed (already validated RelationReplicationFactor = 1) */
|
||||
Assert(list_length(shardPlacementList) == 1);
|
||||
|
||||
ShardPlacement *placement = (ShardPlacement *) linitial(shardPlacementList);
|
||||
UpdateShardPlacementState(placement->placementId, newState);
|
||||
|
||||
/* sync metadata with all other worked nodes */
|
||||
bool shouldSyncMetadata = ShouldSyncTableMetadata(shardInterval->relationId);
|
||||
if (shouldSyncMetadata)
|
||||
{
|
||||
StringInfo updateShardCommand = makeStringInfo();
|
||||
appendStringInfo(updateShardCommand,
|
||||
"SELECT citus_internal_update_shard_and_placement_state_metadata(%ld, %d)",
|
||||
shardId,
|
||||
newState);
|
||||
|
||||
SendCommandToWorkersWithMetadata(updateShardCommand->data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DropShardList drops shards and their metadata from both the coordinator and
|
||||
|
@ -1412,12 +1465,23 @@ NonBlockingShardSplit(SplitOperation splitOperation,
|
|||
shardIntervalToSplit->shardId));
|
||||
DropShardSplitReplicationSlots(sourceConnection, replicationSlotInfoList);
|
||||
|
||||
if (DeferShardDeleteOnSplit)
|
||||
{
|
||||
/*
|
||||
* 18) Drop old shards and delete related metadata. Have to do that before
|
||||
* Defer deletion of source shard and only mark
|
||||
* shard metadata for deletion.
|
||||
*/
|
||||
MarkShardListForDrop(sourceColocatedShardIntervalList);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Drop old shards and delete related metadata. Have to do that before
|
||||
* creating the new shard metadata, because there's cross-checks
|
||||
* preventing inconsistent metadata (like overlapping shards).
|
||||
*/
|
||||
DropShardList(sourceColocatedShardIntervalList);
|
||||
}
|
||||
|
||||
/* 19) Insert new shard and placement metdata */
|
||||
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
|
||||
|
|
|
@ -168,6 +168,9 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
|||
attemptableNodeCount = ShardReplicationFactor;
|
||||
}
|
||||
|
||||
/* shard state is active by default */
|
||||
char shardState = SHARD_STATE_ACTIVE;
|
||||
|
||||
/* first retrieve a list of random nodes for shard placements */
|
||||
while (candidateNodeIndex < attemptableNodeCount)
|
||||
{
|
||||
|
@ -184,7 +187,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
|||
candidateNodeIndex++;
|
||||
}
|
||||
|
||||
InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue);
|
||||
InsertShardRow(relationId, shardId, storageType, shardState, nullMinValue, nullMaxValue);
|
||||
|
||||
CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList,
|
||||
ShardReplicationFactor);
|
||||
|
|
|
@ -3942,13 +3942,17 @@ ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterva
|
|||
|
||||
Datum firstMin = firstInterval->minValue;
|
||||
Datum firstMax = firstInterval->maxValue;
|
||||
char firstState = firstInterval->shardState;
|
||||
Datum secondMin = secondInterval->minValue;
|
||||
Datum secondMax = secondInterval->maxValue;
|
||||
char secondState = secondInterval->shardState;
|
||||
|
||||
FmgrInfo *comparisonFunction = intervalRelation->shardIntervalCompareFunction;
|
||||
Oid collation = intervalRelation->partitionColumn->varcollid;
|
||||
|
||||
return ShardIntervalsOverlapWithParams(firstMin, firstMax, secondMin, secondMax,
|
||||
return ShardIntervalsOverlapWithParams(firstMin, firstMax,
|
||||
secondMin, secondMax,
|
||||
firstState, secondState,
|
||||
comparisonFunction, collation);
|
||||
}
|
||||
|
||||
|
@ -3959,8 +3963,10 @@ ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterva
|
|||
* The caller is responsible to ensure the input shard min/max values are not NULL.
|
||||
*/
|
||||
bool
|
||||
ShardIntervalsOverlapWithParams(Datum firstMin, Datum firstMax, Datum secondMin,
|
||||
Datum secondMax, FmgrInfo *comparisonFunction,
|
||||
ShardIntervalsOverlapWithParams(Datum firstMin, Datum firstMax,
|
||||
Datum secondMin, Datum secondMax,
|
||||
char firstState, char secondState,
|
||||
FmgrInfo *comparisonFunction,
|
||||
Oid collation)
|
||||
{
|
||||
/*
|
||||
|
@ -3976,7 +3982,11 @@ ShardIntervalsOverlapWithParams(Datum firstMin, Datum firstMax, Datum secondMin,
|
|||
int firstComparison = DatumGetInt32(firstDatum);
|
||||
int secondComparison = DatumGetInt32(secondDatum);
|
||||
|
||||
if (firstComparison < 0 || secondComparison < 0)
|
||||
// If one of the shards are marked as TO_DELETED, ignore the overlap.
|
||||
bool markedForDelete = (firstState == SHARD_STATE_TO_DELETE ||
|
||||
secondState == SHARD_STATE_TO_DELETE);
|
||||
|
||||
if (firstComparison < 0 || secondComparison < 0 || markedForDelete)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -880,6 +880,25 @@ RegisterCitusConfigVariables(void)
|
|||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"citus.defer_drop_after_shard_split",
|
||||
gettext_noop("When enabled a shard split will mark the original shards "
|
||||
"for deletion after a successful split, instead of deleting "
|
||||
"them right away."),
|
||||
gettext_noop("The deletion of a shard can sometimes run into a conflict with a "
|
||||
"long running transactions on a the shard during the drop phase of "
|
||||
"the shard split. This causes some moves to be rolled back after "
|
||||
"resources have been spend on splitting the shard. To prevent "
|
||||
"conflicts this feature lets you skip the actual deletion till a "
|
||||
"later point in time. When used one should set "
|
||||
"citus.defer_shard_delete_interval to make sure defered deletions "
|
||||
"will be executed"),
|
||||
&DeferShardDeleteOnSplit,
|
||||
false,
|
||||
PGC_USERSET,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.defer_shard_delete_interval",
|
||||
gettext_noop("Sets the time to wait between background deletion for shards."),
|
||||
|
|
|
@ -72,3 +72,8 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
|
|||
#include "udfs/worker_copy_table_to_node/11.1-1.sql"
|
||||
#include "udfs/worker_split_shard_replication_setup/11.1-1.sql"
|
||||
#include "udfs/citus_isolation_test_session_is_blocked/11.1-1.sql"
|
||||
|
||||
-- Changes for Shard Split Deferred Drop (default value SHARD_STATE_ACTIVE)
|
||||
#include "udfs/citus_internal_add_shard_metadata/11.1-1.sql"
|
||||
#include "udfs/citus_internal_update_shard_and_placement_state_metadata/11.1-1.sql"
|
||||
ALTER TABLE pg_catalog.pg_dist_shard ADD COLUMN shardstate INT NOT NULL DEFAULT 1;
|
||||
|
|
|
@ -89,3 +89,17 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
|
|||
|
||||
DROP VIEW pg_catalog.citus_locks;
|
||||
DROP FUNCTION pg_catalog.citus_locks();
|
||||
|
||||
-- Changes for Shard Split Deferred Drop
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata(
|
||||
relation_id regclass, shard_id bigint,
|
||||
storage_type "char", shard_min_value text,
|
||||
shard_max_value text
|
||||
)
|
||||
RETURNS void
|
||||
LANGUAGE C
|
||||
AS 'MODULE_PATHNAME';
|
||||
COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text) IS
|
||||
'Inserts into pg_dist_shard with user checks';
|
||||
ALTER TABLE pg_catalog.pg_dist_shard DROP COLUMN shardstate;
|
||||
DROP FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata(shard_id bigint, shardState integer);
|
||||
|
|
10
src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/11.1-1.sql
generated
Normal file
10
src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/11.1-1.sql
generated
Normal file
|
@ -0,0 +1,10 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata(
|
||||
relation_id regclass, shard_id bigint,
|
||||
storage_type "char", shardstate integer,
|
||||
shard_min_value text, shard_max_value text
|
||||
)
|
||||
RETURNS void
|
||||
LANGUAGE C
|
||||
AS 'MODULE_PATHNAME';
|
||||
COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", integer, text, text) IS
|
||||
'Inserts into pg_dist_shard with user checks';
|
|
@ -0,0 +1,8 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata(
|
||||
shard_id bigint,
|
||||
shardState integer)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME';
|
||||
COMMENT ON FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata(bigint, integer) IS
|
||||
'Updates into pg_dist_shard and pg_dist_placement with user checks';
|
|
@ -0,0 +1,8 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata(
|
||||
shard_id bigint,
|
||||
shardState integer)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME';
|
||||
COMMENT ON FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata(bigint, integer) IS
|
||||
'Updates into pg_dist_shard and pg_dist_placement with user checks';
|
|
@ -224,8 +224,9 @@ create_monolithic_shard_row(PG_FUNCTION_ARGS)
|
|||
text *minInfoText = cstring_to_text(minInfo->data);
|
||||
text *maxInfoText = cstring_to_text(maxInfo->data);
|
||||
|
||||
InsertShardRow(distributedTableId, newShardId, SHARD_STORAGE_TABLE, minInfoText,
|
||||
maxInfoText);
|
||||
InsertShardRow(distributedTableId, newShardId,
|
||||
SHARD_STORAGE_TABLE, SHARD_STATE_ACTIVE,
|
||||
minInfoText, maxInfoText);
|
||||
|
||||
PG_RETURN_INT64(newShardId);
|
||||
}
|
||||
|
|
|
@ -67,6 +67,7 @@ typedef struct ShardInterval
|
|||
Datum maxValue; /* a shard's typed max value datum */
|
||||
uint64 shardId;
|
||||
int shardIndex;
|
||||
ShardState shardState;
|
||||
} ShardInterval;
|
||||
|
||||
|
||||
|
@ -220,6 +221,7 @@ extern ShardInterval * CopyShardInterval(ShardInterval *srcInterval);
|
|||
extern uint64 ShardLength(uint64 shardId);
|
||||
extern bool NodeGroupHasShardPlacements(int32 groupId,
|
||||
bool onlyConsiderActivePlacements);
|
||||
extern bool IsActiveShardInterval(ShardInterval *shardInterval);
|
||||
extern bool IsActiveShardPlacement(ShardPlacement *ShardPlacement);
|
||||
extern List * FilterShardPlacementList(List *shardPlacementList, bool (*filter)(
|
||||
ShardPlacement *));
|
||||
|
@ -239,7 +241,7 @@ extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
|
|||
|
||||
/* Function declarations to modify shard and shard placement data */
|
||||
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
|
||||
text *shardMinValue, text *shardMaxValue);
|
||||
int shardState, text *shardMinValue, text *shardMaxValue);
|
||||
extern void DeleteShardRow(uint64 shardId);
|
||||
extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId,
|
||||
char shardState, uint64 shardLength,
|
||||
|
@ -250,6 +252,7 @@ extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
|||
extern void UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted);
|
||||
extern void DeletePartitionRow(Oid distributedRelationId);
|
||||
extern void DeleteShardRow(uint64 shardId);
|
||||
extern void UpdateShardState(uint64 shardId, char shardState);
|
||||
extern void UpdateShardPlacementState(uint64 placementId, char shardState);
|
||||
extern void UpdatePlacementGroupId(uint64 placementId, int groupId);
|
||||
extern void DeleteShardPlacementRow(uint64 placementId);
|
||||
|
|
|
@ -560,6 +560,7 @@ extern bool ShardIntervalsOverlap(ShardInterval *firstInterval,
|
|||
ShardInterval *secondInterval);
|
||||
extern bool ShardIntervalsOverlapWithParams(Datum firstMin, Datum firstMax,
|
||||
Datum secondMin, Datum secondMax,
|
||||
char firstState, char secondState,
|
||||
FmgrInfo *comparisonFunction,
|
||||
Oid collation);
|
||||
extern bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId);
|
||||
|
|
|
@ -43,13 +43,14 @@ typedef FormData_pg_dist_shard *Form_pg_dist_shard;
|
|||
* compiler constants for pg_dist_shards
|
||||
* ----------------
|
||||
*/
|
||||
#define Natts_pg_dist_shard 6
|
||||
#define Natts_pg_dist_shard 7
|
||||
#define Anum_pg_dist_shard_logicalrelid 1
|
||||
#define Anum_pg_dist_shard_shardid 2
|
||||
#define Anum_pg_dist_shard_shardstorage 3
|
||||
#define Anum_pg_dist_shard_shardalias_DROPPED 4
|
||||
#define Anum_pg_dist_shard_shardminvalue 5
|
||||
#define Anum_pg_dist_shard_shardmaxvalue 6
|
||||
#define Anum_pg_dist_shard_shardstate 7
|
||||
|
||||
/*
|
||||
* Valid values for shard storage types include foreign table, (standard) table
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
/* GUC to configure deferred shard deletion */
|
||||
extern int DeferShardDeleteInterval;
|
||||
extern bool DeferShardDeleteOnMove;
|
||||
extern bool DeferShardDeleteOnSplit;
|
||||
extern double DesiredPercentFreeAfterMove;
|
||||
extern bool CheckAvailableSpaceBeforeMove;
|
||||
|
||||
|
|
Loading…
Reference in New Issue