mirror of https://github.com/citusdata/citus.git
Simplified Deferred Drop
parent
43c2a1e88b
commit
0653200151
|
@ -1267,9 +1267,13 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId,
|
||||||
/* set shard storage type according to relation type */
|
/* set shard storage type according to relation type */
|
||||||
char shardStorageType = ShardStorageType(citusLocalTableId);
|
char shardStorageType = ShardStorageType(citusLocalTableId);
|
||||||
|
|
||||||
|
/* Default Shard State is Active */
|
||||||
|
ShardState shardState = SHARD_STATE_ACTIVE;
|
||||||
|
|
||||||
text *shardMinValue = NULL;
|
text *shardMinValue = NULL;
|
||||||
text *shardMaxValue = NULL;
|
text *shardMaxValue = NULL;
|
||||||
InsertShardRow(citusLocalTableId, shardId, shardStorageType,
|
InsertShardRow(citusLocalTableId, shardId,
|
||||||
|
shardStorageType, shardState,
|
||||||
shardMinValue, shardMaxValue);
|
shardMinValue, shardMaxValue);
|
||||||
|
|
||||||
List *nodeList = list_make1(CoordinatorNodeIfAddedAsWorkerOrError());
|
List *nodeList = list_make1(CoordinatorNodeIfAddedAsWorkerOrError());
|
||||||
|
|
|
@ -160,7 +160,9 @@ PreprocessGrantStmt(Node *node, const char *queryString,
|
||||||
ddlJob->taskList = NIL;
|
ddlJob->taskList = NIL;
|
||||||
if (IsCitusTable(relationId))
|
if (IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
ddlJob->taskList = DDLTaskList(relationId, ddlString.data);
|
/* Propogate latest policies issue on deleted shards to avoid any potential issues */
|
||||||
|
bool includeOrphanedShards = true;
|
||||||
|
ddlJob->taskList = DDLTaskListExtended(relationId, ddlString.data, includeOrphanedShards);
|
||||||
}
|
}
|
||||||
ddlJobs = lappend(ddlJobs, ddlJob);
|
ddlJobs = lappend(ddlJobs, ddlJob);
|
||||||
|
|
||||||
|
|
|
@ -179,7 +179,14 @@ PreprocessRenameStmt(Node *node, const char *renameCommand,
|
||||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||||
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, tableRelationId);
|
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, tableRelationId);
|
||||||
ddlJob->metadataSyncCommand = renameCommand;
|
ddlJob->metadataSyncCommand = renameCommand;
|
||||||
ddlJob->taskList = DDLTaskList(tableRelationId, renameCommand);
|
|
||||||
|
/*
|
||||||
|
* Rename orphaned shards as well, otherwise the shard's name will be different
|
||||||
|
* from the distributed table. This will cause shard cleaner to fail as we will
|
||||||
|
* try to delete the orphaned shard with the wrong (new) name.
|
||||||
|
*/
|
||||||
|
bool includeOrphanedShards = true;
|
||||||
|
ddlJob->taskList = DDLTaskListExtended(tableRelationId, renameCommand, includeOrphanedShards);
|
||||||
|
|
||||||
return list_make1(ddlJob);
|
return list_make1(ddlJob);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1626,7 +1626,7 @@ InvalidateForeignKeyGraphForDDL(void)
|
||||||
* given list of shards.
|
* given list of shards.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
DDLTaskList(Oid relationId, const char *commandString)
|
DDLTaskListExtended(Oid relationId, const char *commandString, bool includeOrphanedShards)
|
||||||
{
|
{
|
||||||
List *taskList = NIL;
|
List *taskList = NIL;
|
||||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||||
|
@ -1670,6 +1670,18 @@ DDLTaskList(Oid relationId, const char *commandString)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DDLTaskList builds a list of tasks to execute a DDL command on a
|
||||||
|
* given list of shards.
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
DDLTaskList(Oid relationId, const char *commandString)
|
||||||
|
{
|
||||||
|
bool includeOrphanedShards = false;
|
||||||
|
return DDLTaskListExtended(relationId, commandString, includeOrphanedShards);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NodeDDLTaskList builds a list of tasks to execute a DDL command on a
|
* NodeDDLTaskList builds a list of tasks to execute a DDL command on a
|
||||||
* given target set of nodes.
|
* given target set of nodes.
|
||||||
|
|
|
@ -225,7 +225,7 @@ static int32 LocalNodeId = -1;
|
||||||
|
|
||||||
/* built first time through in InitializeDistCache */
|
/* built first time through in InitializeDistCache */
|
||||||
static ScanKeyData DistPartitionScanKey[1];
|
static ScanKeyData DistPartitionScanKey[1];
|
||||||
static ScanKeyData DistShardScanKey[1];
|
static ScanKeyData DistShardScanKey[2];
|
||||||
static ScanKeyData DistObjectScanKey[3];
|
static ScanKeyData DistObjectScanKey[3];
|
||||||
|
|
||||||
|
|
||||||
|
@ -1590,7 +1590,9 @@ BuildCachedShardList(CitusTableCacheEntry *cacheEntry)
|
||||||
&intervalTypeId,
|
&intervalTypeId,
|
||||||
&intervalTypeMod);
|
&intervalTypeMod);
|
||||||
|
|
||||||
List *distShardTupleList = LookupDistShardTuples(cacheEntry->relationId);
|
/* Only load Active shards in the cache */
|
||||||
|
bool activeShardsOnly = true;
|
||||||
|
List *distShardTupleList = LookupDistShardTuples(cacheEntry->relationId, activeShardsOnly);
|
||||||
int shardIntervalArrayLength = list_length(distShardTupleList);
|
int shardIntervalArrayLength = list_length(distShardTupleList);
|
||||||
if (shardIntervalArrayLength > 0)
|
if (shardIntervalArrayLength > 0)
|
||||||
{
|
{
|
||||||
|
@ -3596,6 +3598,14 @@ InitializeDistCache(void)
|
||||||
DistShardScanKey[0].sk_collation = InvalidOid;
|
DistShardScanKey[0].sk_collation = InvalidOid;
|
||||||
DistShardScanKey[0].sk_attno = Anum_pg_dist_shard_logicalrelid;
|
DistShardScanKey[0].sk_attno = Anum_pg_dist_shard_logicalrelid;
|
||||||
|
|
||||||
|
fmgr_info_cxt(F_OIDEQ,
|
||||||
|
&DistShardScanKey[1].sk_func,
|
||||||
|
MetadataCacheMemoryContext);
|
||||||
|
DistShardScanKey[1].sk_strategy = BTLessStrategyNumber;
|
||||||
|
DistShardScanKey[1].sk_subtype = InvalidOid;
|
||||||
|
DistShardScanKey[1].sk_collation = InvalidOid;
|
||||||
|
DistShardScanKey[1].sk_attno = Anum_pg_dist_shard_shardstate;
|
||||||
|
|
||||||
CreateDistTableCache();
|
CreateDistTableCache();
|
||||||
CreateShardIdCache();
|
CreateShardIdCache();
|
||||||
|
|
||||||
|
@ -4539,18 +4549,20 @@ LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId)
|
||||||
* specified relation.
|
* specified relation.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
LookupDistShardTuples(Oid relationId)
|
LookupDistShardTuples(Oid relationId, bool activeShardsOnly)
|
||||||
{
|
{
|
||||||
List *distShardTupleList = NIL;
|
List *distShardTupleList = NIL;
|
||||||
ScanKeyData scanKey[1];
|
ScanKeyData scanKey[2];
|
||||||
|
|
||||||
Relation pgDistShard = table_open(DistShardRelationId(), AccessShareLock);
|
Relation pgDistShard = table_open(DistShardRelationId(), AccessShareLock);
|
||||||
|
|
||||||
/* copy scankey to local copy, it will be modified during the scan */
|
/* copy scankey to local copy, it will be modified during the scan */
|
||||||
scanKey[0] = DistShardScanKey[0];
|
scanKey[0] = DistShardScanKey[0];
|
||||||
|
scanKey[1] = DistShardScanKey[1];
|
||||||
|
|
||||||
/* set scan arguments */
|
/* set scan arguments */
|
||||||
scanKey[0].sk_argument = ObjectIdGetDatum(relationId);
|
scanKey[0].sk_argument = ObjectIdGetDatum(relationId);
|
||||||
|
scanKey[1].sk_argument = (activeShardsOnly) ? SHARD_STATE_ACTIVE : SHARD_STATE_INVALID_LAST;
|
||||||
|
|
||||||
SysScanDesc scanDescriptor = systable_beginscan(pgDistShard,
|
SysScanDesc scanDescriptor = systable_beginscan(pgDistShard,
|
||||||
DistShardLogicalRelidIndexId(), true,
|
DistShardLogicalRelidIndexId(), true,
|
||||||
|
@ -4815,9 +4827,12 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
|
||||||
maxValueExists = true;
|
maxValueExists = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int shardState = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstate - 1]);
|
||||||
|
|
||||||
ShardInterval *shardInterval = CitusMakeNode(ShardInterval);
|
ShardInterval *shardInterval = CitusMakeNode(ShardInterval);
|
||||||
shardInterval->relationId = relationId;
|
shardInterval->relationId = relationId;
|
||||||
shardInterval->storageType = storageType;
|
shardInterval->storageType = storageType;
|
||||||
|
shardInterval->shardState = shardState;
|
||||||
shardInterval->valueTypeId = intervalTypeId;
|
shardInterval->valueTypeId = intervalTypeId;
|
||||||
shardInterval->valueTypeLen = intervalTypeLen;
|
shardInterval->valueTypeLen = intervalTypeLen;
|
||||||
shardInterval->valueByVal = intervalByVal;
|
shardInterval->valueByVal = intervalByVal;
|
||||||
|
|
|
@ -132,9 +132,9 @@ static void EnsurePartitionMetadataIsSane(Oid relationId, char distributionMetho
|
||||||
int colocationId, char replicationModel,
|
int colocationId, char replicationModel,
|
||||||
Var *distributionKey);
|
Var *distributionKey);
|
||||||
static void EnsureCoordinatorInitiatedOperation(void);
|
static void EnsureCoordinatorInitiatedOperation(void);
|
||||||
static void EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType,
|
static void EnsureShardMetadataIsSane(Oid relationId, int64 shardId,
|
||||||
text *shardMinValue,
|
char storageType, int shardState,
|
||||||
text *shardMaxValue);
|
text *shardMinValue, text *shardMaxValue);
|
||||||
static void EnsureShardPlacementMetadataIsSane(Oid relationId, int64 shardId,
|
static void EnsureShardPlacementMetadataIsSane(Oid relationId, int64 shardId,
|
||||||
int64 placementId, int32 shardState,
|
int64 placementId, int32 shardState,
|
||||||
int64 shardLength, int32 groupId);
|
int64 shardLength, int32 groupId);
|
||||||
|
@ -162,6 +162,7 @@ PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata);
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata);
|
PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata);
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata);
|
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata);
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata);
|
PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata);
|
||||||
|
PG_FUNCTION_INFO_V1(citus_internal_update_shard_and_placement_state_metadata);
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata);
|
PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata);
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_update_relation_colocation);
|
PG_FUNCTION_INFO_V1(citus_internal_update_relation_colocation);
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_add_object_metadata);
|
PG_FUNCTION_INFO_V1(citus_internal_add_object_metadata);
|
||||||
|
@ -1295,7 +1296,7 @@ ShardListInsertCommand(List *shardIntervalList)
|
||||||
/* now add shards to insertShardCommand */
|
/* now add shards to insertShardCommand */
|
||||||
StringInfo insertShardCommand = makeStringInfo();
|
StringInfo insertShardCommand = makeStringInfo();
|
||||||
appendStringInfo(insertShardCommand,
|
appendStringInfo(insertShardCommand,
|
||||||
"WITH shard_data(relationname, shardid, storagetype, "
|
"WITH shard_data(relationname, shardid, storagetype, shardstate, "
|
||||||
"shardminvalue, shardmaxvalue) AS (VALUES ");
|
"shardminvalue, shardmaxvalue) AS (VALUES ");
|
||||||
|
|
||||||
foreach_ptr(shardInterval, shardIntervalList)
|
foreach_ptr(shardInterval, shardIntervalList)
|
||||||
|
@ -1328,10 +1329,11 @@ ShardListInsertCommand(List *shardIntervalList)
|
||||||
}
|
}
|
||||||
|
|
||||||
appendStringInfo(insertShardCommand,
|
appendStringInfo(insertShardCommand,
|
||||||
"(%s::regclass, %ld, '%c'::\"char\", %s, %s)",
|
"(%s::regclass, %ld, '%c'::\"char\", %d, %s, %s)",
|
||||||
quote_literal_cstr(qualifiedRelationName),
|
quote_literal_cstr(qualifiedRelationName),
|
||||||
shardId,
|
shardId,
|
||||||
shardInterval->storageType,
|
shardInterval->storageType,
|
||||||
|
shardInterval->shardState,
|
||||||
minHashToken->data,
|
minHashToken->data,
|
||||||
maxHashToken->data);
|
maxHashToken->data);
|
||||||
|
|
||||||
|
@ -1345,7 +1347,7 @@ ShardListInsertCommand(List *shardIntervalList)
|
||||||
|
|
||||||
appendStringInfo(insertShardCommand,
|
appendStringInfo(insertShardCommand,
|
||||||
"SELECT citus_internal_add_shard_metadata(relationname, shardid, "
|
"SELECT citus_internal_add_shard_metadata(relationname, shardid, "
|
||||||
"storagetype, shardminvalue, shardmaxvalue) "
|
"storagetype, shardstate, shardminvalue, shardmaxvalue) "
|
||||||
"FROM shard_data;");
|
"FROM shard_data;");
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -3196,16 +3198,19 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
|
||||||
PG_ENSURE_ARGNOTNULL(2, "storage type");
|
PG_ENSURE_ARGNOTNULL(2, "storage type");
|
||||||
char storageType = PG_GETARG_CHAR(2);
|
char storageType = PG_GETARG_CHAR(2);
|
||||||
|
|
||||||
|
PG_ENSURE_ARGNOTNULL(3, "shardstate");
|
||||||
|
ShardState shardState = PG_GETARG_INT32(3);
|
||||||
|
|
||||||
text *shardMinValue = NULL;
|
text *shardMinValue = NULL;
|
||||||
if (!PG_ARGISNULL(3))
|
if (!PG_ARGISNULL(4))
|
||||||
{
|
{
|
||||||
shardMinValue = PG_GETARG_TEXT_P(3);
|
shardMinValue = PG_GETARG_TEXT_P(4);
|
||||||
}
|
}
|
||||||
|
|
||||||
text *shardMaxValue = NULL;
|
text *shardMaxValue = NULL;
|
||||||
if (!PG_ARGISNULL(4))
|
if (!PG_ARGISNULL(5))
|
||||||
{
|
{
|
||||||
shardMaxValue = PG_GETARG_TEXT_P(4);
|
shardMaxValue = PG_GETARG_TEXT_P(5);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* only owner of the table (or superuser) is allowed to add the Citus metadata */
|
/* only owner of the table (or superuser) is allowed to add the Citus metadata */
|
||||||
|
@ -3224,11 +3229,14 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
|
||||||
* not sane, the user can only affect its own tables. Given that the
|
* not sane, the user can only affect its own tables. Given that the
|
||||||
* user is owner of the table, we should allow.
|
* user is owner of the table, we should allow.
|
||||||
*/
|
*/
|
||||||
EnsureShardMetadataIsSane(relationId, shardId, storageType, shardMinValue,
|
EnsureShardMetadataIsSane(relationId, shardId,
|
||||||
shardMaxValue);
|
storageType, shardState,
|
||||||
|
shardMinValue, shardMaxValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue);
|
InsertShardRow(relationId, shardId,
|
||||||
|
storageType, shardState,
|
||||||
|
shardMinValue, shardMaxValue);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
@ -3262,7 +3270,8 @@ EnsureCoordinatorInitiatedOperation(void)
|
||||||
* for inserting into pg_dist_shard metadata.
|
* for inserting into pg_dist_shard metadata.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType,
|
EnsureShardMetadataIsSane(Oid relationId, int64 shardId,
|
||||||
|
char storageType, int shardState,
|
||||||
text *shardMinValue, text *shardMaxValue)
|
text *shardMinValue, text *shardMaxValue)
|
||||||
{
|
{
|
||||||
if (shardId <= INVALID_SHARD_ID)
|
if (shardId <= INVALID_SHARD_ID)
|
||||||
|
@ -3278,6 +3287,13 @@ EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType,
|
||||||
errmsg("Invalid shard storage type: %c", storageType)));
|
errmsg("Invalid shard storage type: %c", storageType)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!(shardState == SHARD_STATE_ACTIVE ||
|
||||||
|
shardState == SHARD_STATE_TO_DELETE))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("Invalid shard state type: %c", shardState)));
|
||||||
|
}
|
||||||
|
|
||||||
char partitionMethod = PartitionMethodViaCatalog(relationId);
|
char partitionMethod = PartitionMethodViaCatalog(relationId);
|
||||||
if (partitionMethod == DISTRIBUTE_BY_INVALID)
|
if (partitionMethod == DISTRIBUTE_BY_INVALID)
|
||||||
{
|
{
|
||||||
|
@ -3296,7 +3312,9 @@ EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType,
|
||||||
"reference and local tables: %c", partitionMethod)));
|
"reference and local tables: %c", partitionMethod)));
|
||||||
}
|
}
|
||||||
|
|
||||||
List *distShardTupleList = LookupDistShardTuples(relationId);
|
|
||||||
|
bool activeShardsOnly = true;
|
||||||
|
List *distShardTupleList = LookupDistShardTuples(relationId, activeShardsOnly);
|
||||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
if (shardMinValue != NULL || shardMaxValue != NULL)
|
if (shardMinValue != NULL || shardMaxValue != NULL)
|
||||||
|
@ -3574,6 +3592,84 @@ citus_internal_update_placement_metadata(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* citus_internal_update_shard_and_placement_state_metadata is an internal UDF to
|
||||||
|
* update shardState value for Shards and Placements in pg_dist_shard and pg_dist_placement
|
||||||
|
* catalogs.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
citus_internal_update_shard_and_placement_state_metadata(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
int64 shardId = PG_GETARG_INT64(0);
|
||||||
|
ShardState newState = PG_GETARG_INT32(1);
|
||||||
|
|
||||||
|
ShardPlacement *placement = NULL;
|
||||||
|
if (!ShouldSkipMetadataChecks())
|
||||||
|
{
|
||||||
|
/* this UDF is not allowed allowed for executing as a separate command */
|
||||||
|
EnsureCoordinatorInitiatedOperation();
|
||||||
|
|
||||||
|
if (!ShardExists(shardId))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("Shard id does not exists: %ld", shardId)));
|
||||||
|
}
|
||||||
|
|
||||||
|
bool missingOk = false;
|
||||||
|
EnsureShardOwner(shardId, missingOk);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This function ensures that the source group exists hence we
|
||||||
|
* call it from this code-block.
|
||||||
|
*/
|
||||||
|
List *shardPlacementList = ActiveShardPlacementList(shardId);
|
||||||
|
|
||||||
|
/* Split is only allowed for shards with a single placement */
|
||||||
|
Assert(list_length(shardPlacementList) == 1);
|
||||||
|
placement = linitial(shardPlacementList);
|
||||||
|
|
||||||
|
WorkerNode *workerNode = FindNodeWithNodeId(placement->nodeId,
|
||||||
|
false /* missingOk */);
|
||||||
|
if (!workerNode)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("Node with group id %d for shard placement "
|
||||||
|
"%ld does not exist", workerNode->groupId, shardId)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* This function ensures that the source group exists hence we
|
||||||
|
* call it from this code-block.
|
||||||
|
*/
|
||||||
|
List *shardPlacementList = ActiveShardPlacementList(shardId);
|
||||||
|
|
||||||
|
/* Split is only allowed for shards with a single placement */
|
||||||
|
Assert(list_length(shardPlacementList) == 1);
|
||||||
|
placement = linitial(shardPlacementList);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Updating pg_dist_placement ensures that the node with targetGroupId
|
||||||
|
* exists and this is the only placement on that group.
|
||||||
|
*/
|
||||||
|
if (placement == NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("Active placement for shard %ld is not "
|
||||||
|
"found", shardId)));
|
||||||
|
}
|
||||||
|
|
||||||
|
UpdateShardState(shardId, newState);
|
||||||
|
UpdateShardPlacementState(placement->placementId, newState);
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* citus_internal_delete_shard_metadata is an internal UDF to
|
* citus_internal_delete_shard_metadata is an internal UDF to
|
||||||
* delete a row in pg_dist_shard and corresponding placement rows
|
* delete a row in pg_dist_shard and corresponding placement rows
|
||||||
|
|
|
@ -55,6 +55,7 @@
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
|
#include "distributed/shardinterval_utils.h"
|
||||||
#include "nodes/makefuncs.h"
|
#include "nodes/makefuncs.h"
|
||||||
#include "parser/scansup.h"
|
#include "parser/scansup.h"
|
||||||
#include "storage/lmgr.h"
|
#include "storage/lmgr.h"
|
||||||
|
@ -1070,7 +1071,48 @@ LoadShardIntervalList(Oid relationId)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* LoadUnsortedShardIntervalListViaCatalog returns a list of shard intervals related for a
|
* LoadShardIntervalListIncludingOrphansViaCatalog returns a list of sorted shard intervals
|
||||||
|
* for a given distributed table. The function returns an empty list if no shards can be found
|
||||||
|
* for the given relation.
|
||||||
|
*
|
||||||
|
* This function does not use CitusTableCache and instead reads from catalog tables
|
||||||
|
* directly.
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
LoadShardIntervalListIncludingOrphansViaCatalog(Oid relationId)
|
||||||
|
{
|
||||||
|
List *shardIntervalList = LoadUnsortedShardIntervalListIncludingOrphansViaCatalog(relationId);
|
||||||
|
|
||||||
|
// Transform into a temporary array to sort.
|
||||||
|
ShardInterval **shardIntervalArray = (ShardInterval **) PointerArrayFromList(shardIntervalList);
|
||||||
|
int shardIntervalArrayLength = list_length(shardIntervalList);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Although we are loading intervals from catalog, use cache to
|
||||||
|
* get the partition column and compare function.
|
||||||
|
*/
|
||||||
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
||||||
|
ShardInterval **sortedShardIntervalArray = NULL;
|
||||||
|
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
|
{
|
||||||
|
sortedShardIntervalArray = shardIntervalArray;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
sortedShardIntervalArray = SortShardIntervalArray(shardIntervalArray,
|
||||||
|
shardIntervalArrayLength,
|
||||||
|
cacheEntry->partitionColumn->
|
||||||
|
varcollid,
|
||||||
|
cacheEntry->shardIntervalCompareFunction);
|
||||||
|
}
|
||||||
|
|
||||||
|
List *sortedShardIntervalList = ShardArrayToList(sortedShardIntervalArray, shardIntervalArrayLength);
|
||||||
|
return sortedShardIntervalList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LoadUnsortedShardIntervalListIncludingOrphansViaCatalog returns a list of shard intervals related for a
|
||||||
* given distributed table. The function returns an empty list if no shards can be found
|
* given distributed table. The function returns an empty list if no shards can be found
|
||||||
* for the given relation.
|
* for the given relation.
|
||||||
*
|
*
|
||||||
|
@ -1078,10 +1120,12 @@ LoadShardIntervalList(Oid relationId)
|
||||||
* directly.
|
* directly.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
LoadUnsortedShardIntervalListViaCatalog(Oid relationId)
|
LoadUnsortedShardIntervalListIncludingOrphansViaCatalog(Oid relationId)
|
||||||
{
|
{
|
||||||
List *shardIntervalList = NIL;
|
List *shardIntervalList = NIL;
|
||||||
List *distShardTuples = LookupDistShardTuples(relationId);
|
|
||||||
|
bool activeShardsOnly = false;
|
||||||
|
List *distShardTuples = LookupDistShardTuples(relationId, activeShardsOnly);
|
||||||
Relation distShardRelation = table_open(DistShardRelationId(), AccessShareLock);
|
Relation distShardRelation = table_open(DistShardRelationId(), AccessShareLock);
|
||||||
TupleDesc distShardTupleDesc = RelationGetDescr(distShardRelation);
|
TupleDesc distShardTupleDesc = RelationGetDescr(distShardRelation);
|
||||||
Oid intervalTypeId = InvalidOid;
|
Oid intervalTypeId = InvalidOid;
|
||||||
|
@ -1198,6 +1242,7 @@ CopyShardInterval(ShardInterval *srcInterval)
|
||||||
destInterval->type = srcInterval->type;
|
destInterval->type = srcInterval->type;
|
||||||
destInterval->relationId = srcInterval->relationId;
|
destInterval->relationId = srcInterval->relationId;
|
||||||
destInterval->storageType = srcInterval->storageType;
|
destInterval->storageType = srcInterval->storageType;
|
||||||
|
destInterval->shardState = srcInterval->shardState;
|
||||||
destInterval->valueTypeId = srcInterval->valueTypeId;
|
destInterval->valueTypeId = srcInterval->valueTypeId;
|
||||||
destInterval->valueTypeLen = srcInterval->valueTypeLen;
|
destInterval->valueTypeLen = srcInterval->valueTypeLen;
|
||||||
destInterval->valueByVal = srcInterval->valueByVal;
|
destInterval->valueByVal = srcInterval->valueByVal;
|
||||||
|
@ -1632,7 +1677,8 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
|
||||||
* null min/max values in case they are creating an empty shard.
|
* null min/max values in case they are creating an empty shard.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
InsertShardRow(Oid relationId, uint64 shardId, char storageType,
|
InsertShardRow(Oid relationId, uint64 shardId,
|
||||||
|
char storageType, int shardState,
|
||||||
text *shardMinValue, text *shardMaxValue)
|
text *shardMinValue, text *shardMaxValue)
|
||||||
{
|
{
|
||||||
Datum values[Natts_pg_dist_shard];
|
Datum values[Natts_pg_dist_shard];
|
||||||
|
@ -1661,6 +1707,8 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
|
||||||
isNulls[Anum_pg_dist_shard_shardmaxvalue - 1] = true;
|
isNulls[Anum_pg_dist_shard_shardmaxvalue - 1] = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
values[Anum_pg_dist_shard_shardstate - 1] = Int32GetDatum(shardState);
|
||||||
|
|
||||||
/* open shard relation and insert new tuple */
|
/* open shard relation and insert new tuple */
|
||||||
Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock);
|
Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock);
|
||||||
|
|
||||||
|
@ -1942,6 +1990,55 @@ DeleteShardPlacementRow(uint64 placementId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* UpdateShardState sets the shardState for the shard identified by shardId.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
UpdateShardState(uint64 shardId, char shardState)
|
||||||
|
{
|
||||||
|
ScanKeyData scanKey[1];
|
||||||
|
int scanKeyCount = 1;
|
||||||
|
bool indexOK = true;
|
||||||
|
Datum values[Natts_pg_dist_shard];
|
||||||
|
bool isnull[Natts_pg_dist_shard];
|
||||||
|
bool replace[Natts_pg_dist_shard];
|
||||||
|
bool colIsNull = false;
|
||||||
|
|
||||||
|
Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock);
|
||||||
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistShard);
|
||||||
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid,
|
||||||
|
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId));
|
||||||
|
|
||||||
|
SysScanDesc scanDescriptor = systable_beginscan(pgDistShard,
|
||||||
|
DistShardShardidIndexId(), indexOK,
|
||||||
|
NULL, scanKeyCount, scanKey);
|
||||||
|
|
||||||
|
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||||
|
if (!HeapTupleIsValid(heapTuple))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not find valid entry for shard "
|
||||||
|
UINT64_FORMAT, shardId)));
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(replace, 0, sizeof(replace));
|
||||||
|
|
||||||
|
values[Anum_pg_dist_shard_shardstate - 1] = CharGetDatum(shardState);
|
||||||
|
isnull[Anum_pg_dist_shard_shardstate - 1] = false;
|
||||||
|
replace[Anum_pg_dist_shard_shardstate - 1] = true;
|
||||||
|
|
||||||
|
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
|
||||||
|
CatalogTupleUpdate(pgDistShard, &heapTuple->t_self, heapTuple);
|
||||||
|
|
||||||
|
Assert(!colIsNull);
|
||||||
|
CitusInvalidateRelcacheByShardId(shardId);
|
||||||
|
|
||||||
|
CommandCounterIncrement();
|
||||||
|
|
||||||
|
systable_endscan(scanDescriptor);
|
||||||
|
table_close(pgDistShard, NoLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* UpdateShardPlacementState sets the shardState for the placement identified
|
* UpdateShardPlacementState sets the shardState for the placement identified
|
||||||
* by placementId.
|
* by placementId.
|
||||||
|
|
|
@ -195,6 +195,9 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
|
||||||
/* set shard storage type according to relation type */
|
/* set shard storage type according to relation type */
|
||||||
char shardStorageType = ShardStorageType(distributedTableId);
|
char shardStorageType = ShardStorageType(distributedTableId);
|
||||||
|
|
||||||
|
/* shard state is active by default */
|
||||||
|
ShardState shardState = SHARD_STATE_ACTIVE;
|
||||||
|
|
||||||
for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++)
|
for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++)
|
||||||
{
|
{
|
||||||
uint32 roundRobinNodeIndex = shardIndex % workerNodeCount;
|
uint32 roundRobinNodeIndex = shardIndex % workerNodeCount;
|
||||||
|
@ -214,7 +217,8 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
|
||||||
text *minHashTokenText = IntegerToText(shardMinHashToken);
|
text *minHashTokenText = IntegerToText(shardMinHashToken);
|
||||||
text *maxHashTokenText = IntegerToText(shardMaxHashToken);
|
text *maxHashTokenText = IntegerToText(shardMaxHashToken);
|
||||||
|
|
||||||
InsertShardRow(distributedTableId, shardId, shardStorageType,
|
InsertShardRow(distributedTableId, shardId,
|
||||||
|
shardStorageType, shardState,
|
||||||
minHashTokenText, maxHashTokenText);
|
minHashTokenText, maxHashTokenText);
|
||||||
|
|
||||||
List *currentInsertedShardPlacements = InsertShardPlacementRows(
|
List *currentInsertedShardPlacements = InsertShardPlacementRows(
|
||||||
|
@ -289,8 +293,10 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
|
||||||
text *shardMaxValueText = IntegerToText(shardMaxValue);
|
text *shardMaxValueText = IntegerToText(shardMaxValue);
|
||||||
List *sourceShardPlacementList = ShardPlacementListWithoutOrphanedPlacements(
|
List *sourceShardPlacementList = ShardPlacementListWithoutOrphanedPlacements(
|
||||||
sourceShardId);
|
sourceShardId);
|
||||||
|
const ShardState targetShardState = SHARD_STATE_ACTIVE;
|
||||||
|
|
||||||
InsertShardRow(targetRelationId, newShardId, targetShardStorageType,
|
InsertShardRow(targetRelationId, newShardId,
|
||||||
|
targetShardStorageType, targetShardState,
|
||||||
shardMinValueText, shardMaxValueText);
|
shardMinValueText, shardMaxValueText);
|
||||||
|
|
||||||
ShardPlacement *sourcePlacement = NULL;
|
ShardPlacement *sourcePlacement = NULL;
|
||||||
|
@ -370,8 +376,12 @@ CreateReferenceTableShard(Oid distributedTableId)
|
||||||
/* get the next shard id */
|
/* get the next shard id */
|
||||||
uint64 shardId = GetNextShardId();
|
uint64 shardId = GetNextShardId();
|
||||||
|
|
||||||
InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue,
|
/* shard state is active by default */
|
||||||
shardMaxValue);
|
ShardState shardState = SHARD_STATE_ACTIVE;
|
||||||
|
|
||||||
|
InsertShardRow(distributedTableId, shardId,
|
||||||
|
shardStorageType, shardState,
|
||||||
|
shardMinValue, shardMaxValue);
|
||||||
|
|
||||||
List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId,
|
List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId,
|
||||||
nodeList, workerStartIndex,
|
nodeList, workerStartIndex,
|
||||||
|
|
|
@ -139,7 +139,7 @@ citus_drop_all_shards(PG_FUNCTION_ARGS)
|
||||||
*/
|
*/
|
||||||
LockRelationOid(relationId, AccessExclusiveLock);
|
LockRelationOid(relationId, AccessExclusiveLock);
|
||||||
|
|
||||||
List *shardIntervalList = LoadUnsortedShardIntervalListViaCatalog(relationId);
|
List *shardIntervalList = LoadUnsortedShardIntervalListIncludingOrphansViaCatalog(relationId);
|
||||||
int droppedShardCount = DropShards(relationId, schemaName, relationName,
|
int droppedShardCount = DropShards(relationId, schemaName, relationName,
|
||||||
shardIntervalList, dropShardsMetadataOnly);
|
shardIntervalList, dropShardsMetadataOnly);
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
#include "distributed/colocation_utils.h"
|
#include "distributed/colocation_utils.h"
|
||||||
#include "distributed/hash_helpers.h"
|
#include "distributed/hash_helpers.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/metadata_utility.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
#include "distributed/coordinator_protocol.h"
|
#include "distributed/coordinator_protocol.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
|
@ -40,6 +41,7 @@
|
||||||
#include "distributed/shardsplit_logical_replication.h"
|
#include "distributed/shardsplit_logical_replication.h"
|
||||||
#include "distributed/deparse_shard_query.h"
|
#include "distributed/deparse_shard_query.h"
|
||||||
#include "distributed/shard_rebalancer.h"
|
#include "distributed/shard_rebalancer.h"
|
||||||
|
#include "distributed/shard_cleaner.h"
|
||||||
#include "postmaster/postmaster.h"
|
#include "postmaster/postmaster.h"
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -135,7 +137,7 @@ static void AddDummyShardEntryInMap(HTAB *mapOfDummyShards, uint32 targetNodeId,
|
||||||
static void DropDummyShards(HTAB *mapOfDummyShardToPlacement);
|
static void DropDummyShards(HTAB *mapOfDummyShardToPlacement);
|
||||||
static void DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval);
|
static void DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval);
|
||||||
static uint64 GetNextShardIdForSplitChild(void);
|
static uint64 GetNextShardIdForSplitChild(void);
|
||||||
|
static void MarkShardListWithPlacementsForDrop(List *shardIntervalList);
|
||||||
|
|
||||||
/* Customize error message strings based on operation type */
|
/* Customize error message strings based on operation type */
|
||||||
static const char *const SplitOperationName[] =
|
static const char *const SplitOperationName[] =
|
||||||
|
@ -149,6 +151,8 @@ static const char *const SplitTargetName[] =
|
||||||
[ISOLATE_TENANT_TO_NEW_SHARD] = "tenant",
|
[ISOLATE_TENANT_TO_NEW_SHARD] = "tenant",
|
||||||
};
|
};
|
||||||
|
|
||||||
|
bool DeferShardDeleteOnSplit = false;
|
||||||
|
|
||||||
/* Function definitions */
|
/* Function definitions */
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -555,14 +559,23 @@ BlockingShardSplit(SplitOperation splitOperation,
|
||||||
* require additional clean-up in case of failure. The remaining operations
|
* require additional clean-up in case of failure. The remaining operations
|
||||||
* going forward are part of the same distributed transaction.
|
* going forward are part of the same distributed transaction.
|
||||||
*/
|
*/
|
||||||
|
if (DeferShardDeleteOnSplit)
|
||||||
/*
|
{
|
||||||
* Drop old shards and delete related metadata. Have to do that before
|
/*
|
||||||
* creating the new shard metadata, because there's cross-checks
|
* Defer deletion of source shard and only mark
|
||||||
* preventing inconsistent metadata (like overlapping shards).
|
* shard metadata for deletion.
|
||||||
*/
|
*/
|
||||||
DropShardList(sourceColocatedShardIntervalList);
|
MarkShardListWithPlacementsForDrop(sourceColocatedShardIntervalList);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Drop old shards and delete related metadata. Have to do that before
|
||||||
|
* creating the new shard metadata, because there's cross-checks
|
||||||
|
* preventing inconsistent metadata (like overlapping shards).
|
||||||
|
*/
|
||||||
|
DropShardList(sourceColocatedShardIntervalList);
|
||||||
|
}
|
||||||
/* Insert new shard and placement metdata */
|
/* Insert new shard and placement metdata */
|
||||||
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
|
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
|
||||||
workersForPlacementList);
|
workersForPlacementList);
|
||||||
|
@ -1019,6 +1032,7 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
|
||||||
shardInterval->relationId,
|
shardInterval->relationId,
|
||||||
shardInterval->shardId,
|
shardInterval->shardId,
|
||||||
shardInterval->storageType,
|
shardInterval->storageType,
|
||||||
|
shardInterval->shardState,
|
||||||
IntegerToText(DatumGetInt32(shardInterval->minValue)),
|
IntegerToText(DatumGetInt32(shardInterval->minValue)),
|
||||||
IntegerToText(DatumGetInt32(shardInterval->maxValue)));
|
IntegerToText(DatumGetInt32(shardInterval->maxValue)));
|
||||||
|
|
||||||
|
@ -1137,6 +1151,43 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MarkShardListWithPlacementsForDrop update shards and placements metadata from both
|
||||||
|
* the coordinator and mx nodes to TO_DELETE state.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
MarkShardListWithPlacementsForDrop(List *shardIntervalList)
|
||||||
|
{
|
||||||
|
ShardInterval *shardInterval = NULL;
|
||||||
|
foreach_ptr(shardInterval, shardIntervalList)
|
||||||
|
{
|
||||||
|
uint64 shardId = shardInterval->shardId;
|
||||||
|
ShardState newState = SHARD_STATE_TO_DELETE;
|
||||||
|
|
||||||
|
UpdateShardState(shardId, newState);
|
||||||
|
List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
|
||||||
|
|
||||||
|
/* Only single placement allowed (already validated RelationReplicationFactor = 1) */
|
||||||
|
Assert(list_length(shardPlacementList) == 1);
|
||||||
|
|
||||||
|
ShardPlacement *placement = (ShardPlacement *) linitial(shardPlacementList);
|
||||||
|
UpdateShardPlacementState(placement->placementId, newState);
|
||||||
|
|
||||||
|
/* sync metadata with all other worked nodes */
|
||||||
|
bool shouldSyncMetadata = ShouldSyncTableMetadata(shardInterval->relationId);
|
||||||
|
if (shouldSyncMetadata)
|
||||||
|
{
|
||||||
|
StringInfo updateShardCommand = makeStringInfo();
|
||||||
|
appendStringInfo(updateShardCommand,
|
||||||
|
"SELECT citus_internal_update_shard_and_placement_state_metadata(%ld, %d)",
|
||||||
|
shardId,
|
||||||
|
newState);
|
||||||
|
|
||||||
|
SendCommandToWorkersWithMetadata(updateShardCommand->data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DropShardList drops shards and their metadata from both the coordinator and
|
* DropShardList drops shards and their metadata from both the coordinator and
|
||||||
* mx nodes.
|
* mx nodes.
|
||||||
|
@ -1430,13 +1481,23 @@ NonBlockingShardSplit(SplitOperation splitOperation,
|
||||||
/* 19) Drop Publications */
|
/* 19) Drop Publications */
|
||||||
DropPublications(sourceConnection, publicationInfoHash);
|
DropPublications(sourceConnection, publicationInfoHash);
|
||||||
|
|
||||||
|
if (DeferShardDeleteOnSplit)
|
||||||
/*
|
{
|
||||||
* 20) Drop old shards and delete related metadata. Have to do that before
|
/*
|
||||||
* creating the new shard metadata, because there's cross-checks
|
* 18) Defer deletion of source shard and only mark
|
||||||
* preventing inconsistent metadata (like overlapping shards).
|
* shard metadata for deletion.
|
||||||
*/
|
*/
|
||||||
DropShardList(sourceColocatedShardIntervalList);
|
MarkShardListWithPlacementsForDrop(sourceColocatedShardIntervalList);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* 18) Drop old shards and delete related metadata. Have to do that before
|
||||||
|
* creating the new shard metadata, because there's cross-checks
|
||||||
|
* preventing inconsistent metadata (like overlapping shards).
|
||||||
|
*/
|
||||||
|
DropShardList(sourceColocatedShardIntervalList);
|
||||||
|
}
|
||||||
|
|
||||||
/* 21) Insert new shard and placement metdata */
|
/* 21) Insert new shard and placement metdata */
|
||||||
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
|
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
|
||||||
|
|
|
@ -103,6 +103,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
||||||
text *nullMinValue = NULL;
|
text *nullMinValue = NULL;
|
||||||
text *nullMaxValue = NULL;
|
text *nullMaxValue = NULL;
|
||||||
char storageType = SHARD_STORAGE_TABLE;
|
char storageType = SHARD_STORAGE_TABLE;
|
||||||
|
char shardState = SHARD_STATE_ACTIVE;
|
||||||
|
|
||||||
Oid relationId = ResolveRelationId(relationNameText, false);
|
Oid relationId = ResolveRelationId(relationNameText, false);
|
||||||
|
|
||||||
|
@ -184,7 +185,9 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
||||||
candidateNodeIndex++;
|
candidateNodeIndex++;
|
||||||
}
|
}
|
||||||
|
|
||||||
InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue);
|
InsertShardRow(relationId, shardId,
|
||||||
|
storageType, shardState,
|
||||||
|
nullMinValue, nullMaxValue);
|
||||||
|
|
||||||
CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList,
|
CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList,
|
||||||
ShardReplicationFactor);
|
ShardReplicationFactor);
|
||||||
|
|
|
@ -4543,6 +4543,7 @@ GenerateSyntheticShardIntervalArray(int partitionCount)
|
||||||
|
|
||||||
shardInterval->shardId = INVALID_SHARD_ID;
|
shardInterval->shardId = INVALID_SHARD_ID;
|
||||||
shardInterval->valueTypeId = INT4OID;
|
shardInterval->valueTypeId = INT4OID;
|
||||||
|
shardInterval->shardState = SHARD_STATE_INVALID_FIRST;
|
||||||
|
|
||||||
shardIntervalArray[shardIndex] = shardInterval;
|
shardIntervalArray[shardIndex] = shardInterval;
|
||||||
}
|
}
|
||||||
|
|
|
@ -263,7 +263,6 @@ static void AddHashRestrictionToInstance(ClauseWalkerContext *context, OpExpr *o
|
||||||
Var *varClause, Const *constantClause);
|
Var *varClause, Const *constantClause);
|
||||||
static void AddNewConjuction(ClauseWalkerContext *context, PruningTreeNode *node);
|
static void AddNewConjuction(ClauseWalkerContext *context, PruningTreeNode *node);
|
||||||
static PruningInstance * CopyPartialPruningInstance(PruningInstance *sourceInstance);
|
static PruningInstance * CopyPartialPruningInstance(PruningInstance *sourceInstance);
|
||||||
static List * ShardArrayToList(ShardInterval **shardArray, int length);
|
|
||||||
static List * DeepCopyShardIntervalList(List *originalShardIntervalList);
|
static List * DeepCopyShardIntervalList(List *originalShardIntervalList);
|
||||||
static int PerformValueCompare(FunctionCallInfo compareFunctionCall, Datum a,
|
static int PerformValueCompare(FunctionCallInfo compareFunctionCall, Datum a,
|
||||||
Datum b);
|
Datum b);
|
||||||
|
@ -1361,25 +1360,6 @@ CopyPartialPruningInstance(PruningInstance *sourceInstance)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ShardArrayToList builds a list of out the array of ShardInterval*.
|
|
||||||
*/
|
|
||||||
static List *
|
|
||||||
ShardArrayToList(ShardInterval **shardArray, int length)
|
|
||||||
{
|
|
||||||
List *shardIntervalList = NIL;
|
|
||||||
|
|
||||||
for (int shardIndex = 0; shardIndex < length; shardIndex++)
|
|
||||||
{
|
|
||||||
ShardInterval *shardInterval =
|
|
||||||
shardArray[shardIndex];
|
|
||||||
shardIntervalList = lappend(shardIntervalList, shardInterval);
|
|
||||||
}
|
|
||||||
|
|
||||||
return shardIntervalList;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DeepCopyShardIntervalList copies originalShardIntervalList and the
|
* DeepCopyShardIntervalList copies originalShardIntervalList and the
|
||||||
* contained ShardIntervals, into a new list.
|
* contained ShardIntervals, into a new list.
|
||||||
|
|
|
@ -880,6 +880,25 @@ RegisterCitusConfigVariables(void)
|
||||||
0,
|
0,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomBoolVariable(
|
||||||
|
"citus.defer_drop_after_shard_split",
|
||||||
|
gettext_noop("When enabled a shard split will mark the original shards "
|
||||||
|
"for deletion after a successful split, instead of deleting "
|
||||||
|
"them right away."),
|
||||||
|
gettext_noop("The deletion of a shard can sometimes run into a conflict with a "
|
||||||
|
"long running transactions on a the shard during the drop phase of "
|
||||||
|
"the shard split. This causes some moves to be rolled back after "
|
||||||
|
"resources have been spend on splitting the shard. To prevent "
|
||||||
|
"conflicts this feature lets you skip the actual deletion till a "
|
||||||
|
"later point in time. When used one should set "
|
||||||
|
"citus.defer_shard_delete_interval to make sure defered deletions "
|
||||||
|
"will be executed"),
|
||||||
|
&DeferShardDeleteOnSplit,
|
||||||
|
false,
|
||||||
|
PGC_USERSET,
|
||||||
|
0,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomIntVariable(
|
DefineCustomIntVariable(
|
||||||
"citus.defer_shard_delete_interval",
|
"citus.defer_shard_delete_interval",
|
||||||
gettext_noop("Sets the time to wait between background deletion for shards."),
|
gettext_noop("Sets the time to wait between background deletion for shards."),
|
||||||
|
|
|
@ -73,3 +73,8 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
|
||||||
#include "udfs/worker_split_shard_replication_setup/11.1-1.sql"
|
#include "udfs/worker_split_shard_replication_setup/11.1-1.sql"
|
||||||
#include "udfs/citus_isolation_test_session_is_blocked/11.1-1.sql"
|
#include "udfs/citus_isolation_test_session_is_blocked/11.1-1.sql"
|
||||||
#include "udfs/replicate_reference_tables/11.1-1.sql"
|
#include "udfs/replicate_reference_tables/11.1-1.sql"
|
||||||
|
|
||||||
|
-- Changes for Shard Split Deferred Drop (default value SHARD_STATE_ACTIVE)
|
||||||
|
#include "udfs/citus_internal_add_shard_metadata/11.1-1.sql"
|
||||||
|
#include "udfs/citus_internal_update_shard_and_placement_state_metadata/11.1-1.sql"
|
||||||
|
ALTER TABLE pg_catalog.pg_dist_shard ADD COLUMN shardstate INT NOT NULL DEFAULT 1;
|
||||||
|
|
|
@ -92,3 +92,8 @@ DROP FUNCTION pg_catalog.citus_locks();
|
||||||
|
|
||||||
DROP FUNCTION pg_catalog.replicate_reference_tables(citus.shard_transfer_mode);
|
DROP FUNCTION pg_catalog.replicate_reference_tables(citus.shard_transfer_mode);
|
||||||
#include "../udfs/replicate_reference_tables/9.3-2.sql"
|
#include "../udfs/replicate_reference_tables/9.3-2.sql"
|
||||||
|
|
||||||
|
-- Changes for Shard Split Deferred Drop
|
||||||
|
#include "../udfs/citus_internal_add_shard_metadata/10.2-1.sql"
|
||||||
|
ALTER TABLE pg_catalog.pg_dist_shard DROP COLUMN shardstate;
|
||||||
|
DROP FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata(shard_id bigint, shardState integer);
|
||||||
|
|
10
src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/11.1-1.sql
generated
Normal file
10
src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/11.1-1.sql
generated
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata(
|
||||||
|
relation_id regclass, shard_id bigint,
|
||||||
|
storage_type "char", shardstate integer,
|
||||||
|
shard_min_value text, shard_max_value text
|
||||||
|
)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C
|
||||||
|
AS 'MODULE_PATHNAME';
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", integer, text, text) IS
|
||||||
|
'Inserts into pg_dist_shard with user checks';
|
|
@ -1,10 +1,10 @@
|
||||||
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata(
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata(
|
||||||
relation_id regclass, shard_id bigint,
|
relation_id regclass, shard_id bigint,
|
||||||
storage_type "char", shard_min_value text,
|
storage_type "char", shardstate integer,
|
||||||
shard_max_value text
|
shard_min_value text, shard_max_value text
|
||||||
)
|
)
|
||||||
RETURNS void
|
RETURNS void
|
||||||
LANGUAGE C
|
LANGUAGE C
|
||||||
AS 'MODULE_PATHNAME';
|
AS 'MODULE_PATHNAME';
|
||||||
COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text) IS
|
COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", integer, text, text) IS
|
||||||
'Inserts into pg_dist_shard with user checks';
|
'Inserts into pg_dist_shard with user checks';
|
||||||
|
|
|
@ -0,0 +1,8 @@
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata(
|
||||||
|
shard_id bigint,
|
||||||
|
shardState integer)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME';
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata(bigint, integer) IS
|
||||||
|
'Updates into pg_dist_shard and pg_dist_placement with user checks';
|
|
@ -0,0 +1,8 @@
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata(
|
||||||
|
shard_id bigint,
|
||||||
|
shardState integer)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME';
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_internal_update_shard_and_placement_state_metadata(bigint, integer) IS
|
||||||
|
'Updates into pg_dist_shard and pg_dist_placement with user checks';
|
|
@ -224,8 +224,9 @@ create_monolithic_shard_row(PG_FUNCTION_ARGS)
|
||||||
text *minInfoText = cstring_to_text(minInfo->data);
|
text *minInfoText = cstring_to_text(minInfo->data);
|
||||||
text *maxInfoText = cstring_to_text(maxInfo->data);
|
text *maxInfoText = cstring_to_text(maxInfo->data);
|
||||||
|
|
||||||
InsertShardRow(distributedTableId, newShardId, SHARD_STORAGE_TABLE, minInfoText,
|
InsertShardRow(distributedTableId, newShardId,
|
||||||
maxInfoText);
|
SHARD_STORAGE_TABLE, SHARD_STATE_ACTIVE,
|
||||||
|
minInfoText, maxInfoText);
|
||||||
|
|
||||||
PG_RETURN_INT64(newShardId);
|
PG_RETURN_INT64(newShardId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -485,3 +485,21 @@ SingleReplicatedTable(Oid relationId)
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ShardArrayToList builds a list of out the array of ShardInterval*.
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
ShardArrayToList(ShardInterval **shardArray, int length)
|
||||||
|
{
|
||||||
|
List *shardIntervalList = NIL;
|
||||||
|
|
||||||
|
for (int shardIndex = 0; shardIndex < length; shardIndex++)
|
||||||
|
{
|
||||||
|
ShardInterval *shardInterval =
|
||||||
|
shardArray[shardIndex];
|
||||||
|
shardIntervalList = lappend(shardIntervalList, shardInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
return shardIntervalList;
|
||||||
|
}
|
||||||
|
|
|
@ -95,6 +95,7 @@ extern void ProcessUtilityParseTree(Node *node, const char *queryString,
|
||||||
extern void MarkInvalidateForeignKeyGraph(void);
|
extern void MarkInvalidateForeignKeyGraph(void);
|
||||||
extern void InvalidateForeignKeyGraphForDDL(void);
|
extern void InvalidateForeignKeyGraphForDDL(void);
|
||||||
extern List * DDLTaskList(Oid relationId, const char *commandString);
|
extern List * DDLTaskList(Oid relationId, const char *commandString);
|
||||||
|
extern List * DDLTaskListExtended(Oid relationId, const char *commandString, bool includeOrphanedShards);
|
||||||
extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands);
|
extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands);
|
||||||
extern bool AlterTableInProgress(void);
|
extern bool AlterTableInProgress(void);
|
||||||
extern bool DropSchemaOrDBInProgress(void);
|
extern bool DropSchemaOrDBInProgress(void);
|
||||||
|
|
|
@ -149,7 +149,7 @@ extern int GetCitusCreationLevel(void);
|
||||||
extern bool IsCitusTable(Oid relationId);
|
extern bool IsCitusTable(Oid relationId);
|
||||||
extern bool IsCitusTableViaCatalog(Oid relationId);
|
extern bool IsCitusTableViaCatalog(Oid relationId);
|
||||||
extern char PgDistPartitionViaCatalog(Oid relationId);
|
extern char PgDistPartitionViaCatalog(Oid relationId);
|
||||||
extern List * LookupDistShardTuples(Oid relationId);
|
extern List * LookupDistShardTuples(Oid relationId, bool activeShardsOnly);
|
||||||
extern char PartitionMethodViaCatalog(Oid relationId);
|
extern char PartitionMethodViaCatalog(Oid relationId);
|
||||||
extern Var * PartitionColumnViaCatalog(Oid relationId);
|
extern Var * PartitionColumnViaCatalog(Oid relationId);
|
||||||
extern bool IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel);
|
extern bool IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel);
|
||||||
|
|
|
@ -67,6 +67,7 @@ typedef struct ShardInterval
|
||||||
Datum maxValue; /* a shard's typed max value datum */
|
Datum maxValue; /* a shard's typed max value datum */
|
||||||
uint64 shardId;
|
uint64 shardId;
|
||||||
int shardIndex;
|
int shardIndex;
|
||||||
|
ShardState shardState;
|
||||||
} ShardInterval;
|
} ShardInterval;
|
||||||
|
|
||||||
|
|
||||||
|
@ -212,7 +213,8 @@ extern Datum citus_relation_size(PG_FUNCTION_ARGS);
|
||||||
/* Function declarations to read shard and shard placement data */
|
/* Function declarations to read shard and shard placement data */
|
||||||
extern uint32 TableShardReplicationFactor(Oid relationId);
|
extern uint32 TableShardReplicationFactor(Oid relationId);
|
||||||
extern List * LoadShardIntervalList(Oid relationId);
|
extern List * LoadShardIntervalList(Oid relationId);
|
||||||
extern List * LoadUnsortedShardIntervalListViaCatalog(Oid relationId);
|
extern List * LoadShardIntervalListIncludingOrphansViaCatalog(Oid relationId);
|
||||||
|
extern List * LoadUnsortedShardIntervalListIncludingOrphansViaCatalog(Oid relationId);
|
||||||
extern ShardInterval * LoadShardIntervalWithLongestShardName(Oid relationId);
|
extern ShardInterval * LoadShardIntervalWithLongestShardName(Oid relationId);
|
||||||
extern int ShardIntervalCount(Oid relationId);
|
extern int ShardIntervalCount(Oid relationId);
|
||||||
extern List * LoadShardList(Oid relationId);
|
extern List * LoadShardList(Oid relationId);
|
||||||
|
@ -238,9 +240,11 @@ extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
|
||||||
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
|
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
|
||||||
|
|
||||||
/* Function declarations to modify shard and shard placement data */
|
/* Function declarations to modify shard and shard placement data */
|
||||||
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
|
extern void InsertShardRow(Oid relationId, uint64 shardId,
|
||||||
|
char storageType, int shardState,
|
||||||
text *shardMinValue, text *shardMaxValue);
|
text *shardMinValue, text *shardMaxValue);
|
||||||
extern void DeleteShardRow(uint64 shardId);
|
extern void DeleteShardRow(uint64 shardId);
|
||||||
|
extern void UpdateShardState(uint64 shardId, char shardState);
|
||||||
extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId,
|
extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId,
|
||||||
char shardState, uint64 shardLength,
|
char shardState, uint64 shardLength,
|
||||||
int32 groupId);
|
int32 groupId);
|
||||||
|
|
|
@ -43,13 +43,14 @@ typedef FormData_pg_dist_shard *Form_pg_dist_shard;
|
||||||
* compiler constants for pg_dist_shards
|
* compiler constants for pg_dist_shards
|
||||||
* ----------------
|
* ----------------
|
||||||
*/
|
*/
|
||||||
#define Natts_pg_dist_shard 6
|
#define Natts_pg_dist_shard 7
|
||||||
#define Anum_pg_dist_shard_logicalrelid 1
|
#define Anum_pg_dist_shard_logicalrelid 1
|
||||||
#define Anum_pg_dist_shard_shardid 2
|
#define Anum_pg_dist_shard_shardid 2
|
||||||
#define Anum_pg_dist_shard_shardstorage 3
|
#define Anum_pg_dist_shard_shardstorage 3
|
||||||
#define Anum_pg_dist_shard_shardalias_DROPPED 4
|
#define Anum_pg_dist_shard_shardalias_DROPPED 4
|
||||||
#define Anum_pg_dist_shard_shardminvalue 5
|
#define Anum_pg_dist_shard_shardminvalue 5
|
||||||
#define Anum_pg_dist_shard_shardmaxvalue 6
|
#define Anum_pg_dist_shard_shardmaxvalue 6
|
||||||
|
#define Anum_pg_dist_shard_shardstate 7
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Valid values for shard storage types include foreign table, (standard) table
|
* Valid values for shard storage types include foreign table, (standard) table
|
||||||
|
|
|
@ -30,12 +30,15 @@
|
||||||
*
|
*
|
||||||
* The numbers assigned per state used for historical reason and should
|
* The numbers assigned per state used for historical reason and should
|
||||||
* not be changed since they correspond to shardstate in pg_dist_placement.
|
* not be changed since they correspond to shardstate in pg_dist_placement.
|
||||||
|
* LookupDistShardTuples also depends on the order of these states.
|
||||||
*/
|
*/
|
||||||
typedef enum
|
typedef enum
|
||||||
{
|
{
|
||||||
SHARD_STATE_INVALID_FIRST = 0,
|
SHARD_STATE_INVALID_FIRST = 0,
|
||||||
SHARD_STATE_ACTIVE = 1,
|
SHARD_STATE_ACTIVE = 1,
|
||||||
SHARD_STATE_TO_DELETE = 4,
|
SHARD_STATE_TO_DELETE = 4,
|
||||||
|
|
||||||
|
SHARD_STATE_INVALID_LAST = 5
|
||||||
} ShardState;
|
} ShardState;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
/* GUC to configure deferred shard deletion */
|
/* GUC to configure deferred shard deletion */
|
||||||
extern int DeferShardDeleteInterval;
|
extern int DeferShardDeleteInterval;
|
||||||
extern bool DeferShardDeleteOnMove;
|
extern bool DeferShardDeleteOnMove;
|
||||||
|
extern bool DeferShardDeleteOnSplit;
|
||||||
extern double DesiredPercentFreeAfterMove;
|
extern double DesiredPercentFreeAfterMove;
|
||||||
extern bool CheckAvailableSpaceBeforeMove;
|
extern bool CheckAvailableSpaceBeforeMove;
|
||||||
|
|
||||||
|
|
|
@ -55,6 +55,7 @@ extern int SearchCachedShardInterval(Datum partitionColumnValue,
|
||||||
int shardCount, Oid shardIntervalCollation,
|
int shardCount, Oid shardIntervalCollation,
|
||||||
FmgrInfo *compareFunction);
|
FmgrInfo *compareFunction);
|
||||||
extern bool SingleReplicatedTable(Oid relationId);
|
extern bool SingleReplicatedTable(Oid relationId);
|
||||||
|
extern List * ShardArrayToList(ShardInterval **shardArray, int length);
|
||||||
|
|
||||||
|
|
||||||
#endif /* SHARDINTERVAL_UTILS_H_ */
|
#endif /* SHARDINTERVAL_UTILS_H_ */
|
||||||
|
|
Loading…
Reference in New Issue