mirror of https://github.com/citusdata/citus.git
Isolate Tenant and Split Shard Unification
parent
b502526ef1
commit
c38de446bb
|
@ -33,8 +33,8 @@ static SplitMode LookupSplitMode(Oid shardSplitModeOid);
|
|||
* citus_split_shard_by_split_points(shard_id bigint, split_points integer[], node_ids integer[])
|
||||
* Split source shard into multiple shards using the given split points.
|
||||
* 'shard_id' is the id of source shard to split.
|
||||
* 'split_points' is an array of integers that represents the split points.
|
||||
* 'node_ids' is an array of integers that represents the placement node ids of the new shards.
|
||||
* 'split_points' is an array that represents the split points.
|
||||
* 'node_ids' is an array that represents the placement node ids of the new shards.
|
||||
* 'split_mode citus.split_mode' is the mode of split.
|
||||
*/
|
||||
Datum
|
||||
|
@ -66,7 +66,7 @@ citus_split_shard_by_split_points(PG_FUNCTION_ARGS)
|
|||
|
||||
/*
|
||||
* LookupSplitMode maps the oids of citus.shard_split_mode enum
|
||||
* values to a char.
|
||||
* values to an enum.
|
||||
*/
|
||||
SplitMode
|
||||
LookupSplitMode(Oid shardSplitModeOid)
|
||||
|
|
|
@ -32,6 +32,7 @@
|
|||
#include "distributed/worker_protocol.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "distributed/shard_split.h"
|
||||
#include "nodes/pg_list.h"
|
||||
#include "storage/lock.h"
|
||||
#include "utils/builtins.h"
|
||||
|
@ -48,7 +49,6 @@ PG_FUNCTION_INFO_V1(worker_hash);
|
|||
|
||||
/* local function forward declarations */
|
||||
static uint64 SplitShardByValue(ShardInterval *sourceShard, Datum distributionValueDatum);
|
||||
static void ErrorIfCannotSplitShard(ShardInterval *sourceShard);
|
||||
static void CreateSplitOffShards(ShardInterval *sourceShard, int hashedValue,
|
||||
List **splitOffShardList, int *isolatedShardId);
|
||||
static List * ShardTemplateList(ShardInterval *sourceShard, int hashedValue,
|
||||
|
@ -57,13 +57,6 @@ static ShardInterval * CreateSplitOffShardFromTemplate(ShardInterval *shardTempl
|
|||
Oid relationId);
|
||||
static List * SplitOffCommandList(ShardInterval *sourceShard,
|
||||
ShardInterval *splitOffShard);
|
||||
static void ExecuteCommandListOnPlacements(List *commandList, List *placementList);
|
||||
static void InsertSplitOffShardMetadata(List *splitOffShardList,
|
||||
List *sourcePlacementList);
|
||||
static void CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList);
|
||||
static void ExecuteCommandListOnWorker(char *nodeName, int nodePort, List *commandList);
|
||||
static void DropShardList(List *shardIntervalList);
|
||||
|
||||
|
||||
/*
|
||||
* isolate_tenant_to_new_shard isolates a tenant to its own shard by spliting
|
||||
|
@ -245,7 +238,7 @@ SplitShardByValue(ShardInterval *sourceShard, Datum distributionValueDatum)
|
|||
/* get locks */
|
||||
BlockWritesToShardList(colocatedShardList);
|
||||
|
||||
ErrorIfCannotSplitShard(sourceShard);
|
||||
ErrorIfCannotSplitShard(ISOLATE_TENANT_TO_NEW_SHARD, sourceShard);
|
||||
|
||||
/* get hash function name */
|
||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
||||
|
@ -283,154 +276,6 @@ SplitShardByValue(ShardInterval *sourceShard, Datum distributionValueDatum)
|
|||
return isolatedShardId;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateForeignConstraints creates the foreign constraints on the newly
|
||||
* created shards via the tenant isolation.
|
||||
*
|
||||
* The function treats foreign keys to reference tables and foreign keys to
|
||||
* co-located distributed tables differently. The former one needs to be
|
||||
* executed over a single connection to prevent self-deadlocks. The latter
|
||||
* one can be executed in parallel if there are multiple replicas.
|
||||
*/
|
||||
static void
|
||||
CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList)
|
||||
{
|
||||
ListCell *splitOffShardCell = NULL;
|
||||
|
||||
List *colocatedShardForeignConstraintCommandList = NIL;
|
||||
List *referenceTableForeignConstraintList = NIL;
|
||||
|
||||
foreach(splitOffShardCell, splitOffShardList)
|
||||
{
|
||||
ShardInterval *splitOffShard = (ShardInterval *) lfirst(splitOffShardCell);
|
||||
|
||||
List *currentColocatedForeignKeyList = NIL;
|
||||
List *currentReferenceForeignKeyList = NIL;
|
||||
|
||||
CopyShardForeignConstraintCommandListGrouped(splitOffShard,
|
||||
¤tColocatedForeignKeyList,
|
||||
¤tReferenceForeignKeyList);
|
||||
|
||||
colocatedShardForeignConstraintCommandList =
|
||||
list_concat(colocatedShardForeignConstraintCommandList,
|
||||
currentColocatedForeignKeyList);
|
||||
referenceTableForeignConstraintList =
|
||||
list_concat(referenceTableForeignConstraintList,
|
||||
currentReferenceForeignKeyList);
|
||||
}
|
||||
|
||||
/*
|
||||
* We can use parallel connections to while creating co-located foreign keys
|
||||
* if the source placement .
|
||||
* However, foreign keys to reference tables need to be created using a single
|
||||
* connection per worker to prevent self-deadlocks.
|
||||
*/
|
||||
if (colocatedShardForeignConstraintCommandList != NIL)
|
||||
{
|
||||
ExecuteCommandListOnPlacements(colocatedShardForeignConstraintCommandList,
|
||||
sourcePlacementList);
|
||||
}
|
||||
|
||||
if (referenceTableForeignConstraintList != NIL)
|
||||
{
|
||||
ListCell *shardPlacementCell = NULL;
|
||||
foreach(shardPlacementCell, sourcePlacementList)
|
||||
{
|
||||
ShardPlacement *shardPlacement =
|
||||
(ShardPlacement *) lfirst(shardPlacementCell);
|
||||
|
||||
char *nodeName = shardPlacement->nodeName;
|
||||
int32 nodePort = shardPlacement->nodePort;
|
||||
|
||||
/*
|
||||
* We're using the connections that we've used for dropping the
|
||||
* source placements within the same coordinated transaction.
|
||||
*/
|
||||
ExecuteCommandListOnWorker(nodeName, nodePort,
|
||||
referenceTableForeignConstraintList);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExecuteCommandListOnWorker executes the command on the given node within
|
||||
* the coordinated 2PC.
|
||||
*/
|
||||
static void
|
||||
ExecuteCommandListOnWorker(char *nodeName, int nodePort, List *commandList)
|
||||
{
|
||||
ListCell *commandCell = NULL;
|
||||
|
||||
foreach(commandCell, commandList)
|
||||
{
|
||||
char *command = (char *) lfirst(commandCell);
|
||||
|
||||
SendCommandToWorker(nodeName, nodePort, command);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfCannotSplitShard checks relation kind and invalid shards. It errors
|
||||
* out if we are not able to split the given shard.
|
||||
*/
|
||||
static void
|
||||
ErrorIfCannotSplitShard(ShardInterval *sourceShard)
|
||||
{
|
||||
Oid relationId = sourceShard->relationId;
|
||||
ListCell *colocatedTableCell = NULL;
|
||||
ListCell *colocatedShardCell = NULL;
|
||||
|
||||
/* checks for table ownership and foreign tables */
|
||||
List *colocatedTableList = ColocatedTableList(relationId);
|
||||
foreach(colocatedTableCell, colocatedTableList)
|
||||
{
|
||||
Oid colocatedTableId = lfirst_oid(colocatedTableCell);
|
||||
|
||||
/* check that user has owner rights in all co-located tables */
|
||||
EnsureTableOwner(colocatedTableId);
|
||||
|
||||
char relationKind = get_rel_relkind(colocatedTableId);
|
||||
if (relationKind == RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
char *relationName = get_rel_name(colocatedTableId);
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot isolate tenant because \"%s\" is a "
|
||||
"foreign table", relationName),
|
||||
errdetail("Isolating shards backed by foreign tables "
|
||||
"is not supported.")));
|
||||
}
|
||||
}
|
||||
|
||||
/* check shards with inactive placements */
|
||||
List *colocatedShardList = ColocatedShardIntervalList(sourceShard);
|
||||
foreach(colocatedShardCell, colocatedShardList)
|
||||
{
|
||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(colocatedShardCell);
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
ListCell *shardPlacementCell = NULL;
|
||||
|
||||
List *shardPlacementList = ShardPlacementListWithoutOrphanedPlacements(shardId);
|
||||
foreach(shardPlacementCell, shardPlacementList)
|
||||
{
|
||||
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
|
||||
if (placement->shardState != SHARD_STATE_ACTIVE)
|
||||
{
|
||||
char *relationName = get_rel_name(shardInterval->relationId);
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot isolate tenant because relation "
|
||||
"\"%s\" has an inactive shard placement "
|
||||
"for the shard %lu", relationName, shardId),
|
||||
errhint("Use master_copy_shard_placement UDF to "
|
||||
"repair the inactive shard placement.")));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateSplitOffShards gets a shard and a hashed value to pick the split point.
|
||||
* First, it creates templates to create new shards. Then, for every colocated
|
||||
|
@ -611,212 +456,3 @@ SplitOffCommandList(ShardInterval *sourceShard, ShardInterval *splitOffShard)
|
|||
|
||||
return splitOffCommandList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExecuteCommandListOnPlacements runs the given command list on the nodes of
|
||||
* the given shard placement list. First, it creates connections. Then it sends
|
||||
* commands one by one. For every command, first it send the command to all
|
||||
* connections and then checks the results. This helps to run long running
|
||||
* commands in parallel. Finally, it sends commit messages to all connections
|
||||
* and close them.
|
||||
*/
|
||||
static void
|
||||
ExecuteCommandListOnPlacements(List *commandList, List *placementList)
|
||||
{
|
||||
List *workerConnectionList = NIL;
|
||||
ListCell *workerConnectionCell = NULL;
|
||||
ListCell *shardPlacementCell = NULL;
|
||||
ListCell *commandCell = NULL;
|
||||
|
||||
/* create connections and start transactions */
|
||||
foreach(shardPlacementCell, placementList)
|
||||
{
|
||||
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell);
|
||||
char *nodeName = shardPlacement->nodeName;
|
||||
int32 nodePort = shardPlacement->nodePort;
|
||||
|
||||
int connectionFlags = FORCE_NEW_CONNECTION;
|
||||
char *currentUser = CurrentUserName();
|
||||
|
||||
/* create a new connection */
|
||||
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
|
||||
nodeName,
|
||||
nodePort,
|
||||
currentUser,
|
||||
NULL);
|
||||
|
||||
/* mark connection as critical ans start transaction */
|
||||
MarkRemoteTransactionCritical(workerConnection);
|
||||
RemoteTransactionBegin(workerConnection);
|
||||
|
||||
/* add connection to the list */
|
||||
workerConnectionList = lappend(workerConnectionList, workerConnection);
|
||||
}
|
||||
|
||||
/* send and check results for every command one by one */
|
||||
foreach(commandCell, commandList)
|
||||
{
|
||||
char *command = lfirst(commandCell);
|
||||
|
||||
/* first only send the command */
|
||||
foreach(workerConnectionCell, workerConnectionList)
|
||||
{
|
||||
MultiConnection *workerConnection =
|
||||
(MultiConnection *) lfirst(workerConnectionCell);
|
||||
|
||||
int querySent = SendRemoteCommand(workerConnection, command);
|
||||
if (querySent == 0)
|
||||
{
|
||||
ReportConnectionError(workerConnection, ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
/* then check the result separately to run long running commands in parallel */
|
||||
foreach(workerConnectionCell, workerConnectionList)
|
||||
{
|
||||
MultiConnection *workerConnection =
|
||||
(MultiConnection *) lfirst(workerConnectionCell);
|
||||
bool raiseInterrupts = true;
|
||||
|
||||
PGresult *result = GetRemoteCommandResult(workerConnection, raiseInterrupts);
|
||||
if (!IsResponseOK(result))
|
||||
{
|
||||
ReportResultError(workerConnection, result, ERROR);
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
ForgetResults(workerConnection);
|
||||
}
|
||||
}
|
||||
|
||||
/* finally commit each transaction and close connections */
|
||||
foreach(workerConnectionCell, workerConnectionList)
|
||||
{
|
||||
MultiConnection *workerConnection =
|
||||
(MultiConnection *) lfirst(workerConnectionCell);
|
||||
|
||||
RemoteTransactionCommit(workerConnection);
|
||||
CloseConnection(workerConnection);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InsertSplitOffShardMetadata inserts new shard and shard placement data into
|
||||
* catolog tables both the coordinator and mx nodes.
|
||||
*/
|
||||
static void
|
||||
InsertSplitOffShardMetadata(List *splitOffShardList, List *sourcePlacementList)
|
||||
{
|
||||
List *syncedShardList = NIL;
|
||||
ListCell *shardCell = NULL;
|
||||
ListCell *commandCell = NULL;
|
||||
|
||||
/* add new metadata */
|
||||
foreach(shardCell, splitOffShardList)
|
||||
{
|
||||
ShardInterval *splitOffShard = (ShardInterval *) lfirst(shardCell);
|
||||
Oid relationId = splitOffShard->relationId;
|
||||
uint64 shardId = splitOffShard->shardId;
|
||||
char storageType = splitOffShard->storageType;
|
||||
ListCell *shardPlacementCell = NULL;
|
||||
|
||||
int32 shardMinValue = DatumGetInt32(splitOffShard->minValue);
|
||||
int32 shardMaxValue = DatumGetInt32(splitOffShard->maxValue);
|
||||
text *shardMinValueText = IntegerToText(shardMinValue);
|
||||
text *shardMaxValueText = IntegerToText(shardMaxValue);
|
||||
|
||||
InsertShardRow(relationId, shardId, storageType, shardMinValueText,
|
||||
shardMaxValueText);
|
||||
|
||||
/* split off shard placement metadata */
|
||||
foreach(shardPlacementCell, sourcePlacementList)
|
||||
{
|
||||
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
|
||||
uint64 shardSize = 0;
|
||||
|
||||
InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, SHARD_STATE_ACTIVE,
|
||||
shardSize, placement->groupId);
|
||||
}
|
||||
|
||||
if (ShouldSyncTableMetadata(relationId))
|
||||
{
|
||||
syncedShardList = lappend(syncedShardList, splitOffShard);
|
||||
}
|
||||
}
|
||||
|
||||
/* send commands to synced nodes one by one */
|
||||
List *splitOffShardMetadataCommandList = ShardListInsertCommand(syncedShardList);
|
||||
foreach(commandCell, splitOffShardMetadataCommandList)
|
||||
{
|
||||
char *command = (char *) lfirst(commandCell);
|
||||
SendCommandToWorkersWithMetadata(command);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DropShardList drops shards and their metadata from both the coordinator and
|
||||
* mx nodes.
|
||||
*/
|
||||
static void
|
||||
DropShardList(List *shardIntervalList)
|
||||
{
|
||||
ListCell *shardIntervalCell = NULL;
|
||||
|
||||
foreach(shardIntervalCell, shardIntervalList)
|
||||
{
|
||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
||||
ListCell *shardPlacementCell = NULL;
|
||||
Oid relationId = shardInterval->relationId;
|
||||
uint64 oldShardId = shardInterval->shardId;
|
||||
|
||||
/* delete metadata from synced nodes */
|
||||
if (ShouldSyncTableMetadata(relationId))
|
||||
{
|
||||
ListCell *commandCell = NULL;
|
||||
|
||||
/* send the commands one by one */
|
||||
List *shardMetadataDeleteCommandList = ShardDeleteCommandList(shardInterval);
|
||||
foreach(commandCell, shardMetadataDeleteCommandList)
|
||||
{
|
||||
char *command = (char *) lfirst(commandCell);
|
||||
SendCommandToWorkersWithMetadata(command);
|
||||
}
|
||||
}
|
||||
|
||||
/* delete shard placements and drop shards */
|
||||
List *shardPlacementList = ActiveShardPlacementList(oldShardId);
|
||||
foreach(shardPlacementCell, shardPlacementList)
|
||||
{
|
||||
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
|
||||
char *workerName = placement->nodeName;
|
||||
uint32 workerPort = placement->nodePort;
|
||||
StringInfo dropQuery = makeStringInfo();
|
||||
|
||||
DeleteShardPlacementRow(placement->placementId);
|
||||
|
||||
/* get shard name */
|
||||
char *qualifiedShardName = ConstructQualifiedShardName(shardInterval);
|
||||
|
||||
char storageType = shardInterval->storageType;
|
||||
if (storageType == SHARD_STORAGE_TABLE)
|
||||
{
|
||||
appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND,
|
||||
qualifiedShardName);
|
||||
}
|
||||
else if (storageType == SHARD_STORAGE_FOREIGN)
|
||||
{
|
||||
appendStringInfo(dropQuery, DROP_FOREIGN_TABLE_COMMAND,
|
||||
qualifiedShardName);
|
||||
}
|
||||
|
||||
/* drop old shard */
|
||||
SendCommandToWorker(workerName, workerPort, dropQuery->data);
|
||||
}
|
||||
|
||||
/* delete shard row */
|
||||
DeleteShardRow(oldShardId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,22 +33,12 @@
|
|||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
|
||||
/*
|
||||
* These will be public function when citus-enterprise is merged
|
||||
*/
|
||||
static void ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShard);
|
||||
static void InsertSplitOffShardMetadata(List *splitOffShardList,
|
||||
List *sourcePlacementList);
|
||||
static void DropShardList(List *shardIntervalList);
|
||||
static void CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList);
|
||||
static void ExecuteCommandListOnPlacements(List *commandList, List *placementList);
|
||||
|
||||
/* Function declarations */
|
||||
static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation,
|
||||
ShardInterval *shardIntervalToSplit,
|
||||
List *shardSplitPointsList,
|
||||
List *nodeIdsForPlacementList);
|
||||
static void CreateSplitShardsForShardGroup(uint32_t sourceShardNodeId,
|
||||
static void CreateSplitShardsForShardGroup(WorkerNode* sourceShardNode,
|
||||
List *sourceColocatedShardIntervalList,
|
||||
List *shardGroupSplitIntervalListList,
|
||||
List *workersForPlacementList,
|
||||
|
@ -64,17 +54,19 @@ static void BlockingShardSplit(SplitOperation splitOperation,
|
|||
ShardInterval *shardIntervalToSplit,
|
||||
List *shardSplitPointsList,
|
||||
List *workersForPlacementList);
|
||||
static void DoSplitCopy(uint32_t sourceShardNodeId, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList);
|
||||
static void DoSplitCopy(WorkerNode* sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList);
|
||||
static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChildrenShardIntervalList, List* workersForPlacementList);
|
||||
|
||||
/* Customize error message strings based on operation type */
|
||||
static const char *const SplitOperationName[] =
|
||||
{
|
||||
[SHARD_SPLIT_API] = "split"
|
||||
[SHARD_SPLIT_API] = "split",
|
||||
[ISOLATE_TENANT_TO_NEW_SHARD] = "isolate",
|
||||
};
|
||||
static const char *const SplitTargetName[] =
|
||||
{
|
||||
[SHARD_SPLIT_API] = "shard"
|
||||
[SHARD_SPLIT_API] = "shard",
|
||||
[ISOLATE_TENANT_TO_NEW_SHARD] = "tenant",
|
||||
};
|
||||
static const char *const SplitOperationType[] =
|
||||
{
|
||||
|
@ -345,10 +337,6 @@ SplitShard(SplitMode splitMode,
|
|||
LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock);
|
||||
}
|
||||
|
||||
/*
|
||||
* TODO(niupre): When all consumers (Example : ISOLATE_TENANT_TO_NEW_SHARD) directly call 'SplitShard' API,
|
||||
* these two methods will be merged.
|
||||
*/
|
||||
ErrorIfCannotSplitShard(SHARD_SPLIT_API, shardIntervalToSplit);
|
||||
ErrorIfCannotSplitShardExtended(
|
||||
SHARD_SPLIT_API,
|
||||
|
@ -386,7 +374,7 @@ SplitShard(SplitMode splitMode,
|
|||
else
|
||||
{
|
||||
/* we only support blocking shard split in this code path for now. */
|
||||
ereport(ERROR, (errmsg("Invalid split mode %s.", SplitOperationType[splitMode])));
|
||||
ereport(ERROR, (errmsg("Invalid split mode value %d.", splitMode)));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -415,7 +403,7 @@ BlockingShardSplit(SplitOperation splitOperation,
|
|||
sourceColocatedShardIntervalList,
|
||||
shardSplitPointsList);
|
||||
|
||||
// Only single placement allowed (already validated by caller)
|
||||
/* Only single placement allowed (already validated by caller) */
|
||||
List *sourcePlacementList = ActiveShardPlacementList(shardIntervalToSplit->shardId);
|
||||
Assert(sourcePlacementList->length == 1);
|
||||
WorkerNode* sourceShardToCopyNode = (WorkerNode *) linitial(sourcePlacementList);
|
||||
|
@ -423,7 +411,7 @@ BlockingShardSplit(SplitOperation splitOperation,
|
|||
/* Physically create split children and perform split copy */
|
||||
List *splitOffShardList = NULL;
|
||||
CreateSplitShardsForShardGroup(
|
||||
sourceShardToCopyNode->nodeId,
|
||||
sourceShardToCopyNode,
|
||||
sourceColocatedShardIntervalList,
|
||||
shardGroupSplitIntervalListList,
|
||||
workersForPlacementList,
|
||||
|
@ -451,7 +439,7 @@ BlockingShardSplit(SplitOperation splitOperation,
|
|||
|
||||
/* Create ShardGroup split children on a list of corresponding workers. */
|
||||
static void
|
||||
CreateSplitShardsForShardGroup(uint32_t sourceShardNodeId,
|
||||
CreateSplitShardsForShardGroup(WorkerNode* sourceShardNode,
|
||||
List *sourceColocatedShardIntervalList,
|
||||
List *shardGroupSplitIntervalListList,
|
||||
List *workersForPlacementList,
|
||||
|
@ -483,26 +471,48 @@ CreateSplitShardsForShardGroup(uint32_t sourceShardNodeId,
|
|||
}
|
||||
|
||||
/* Perform Split Copy */
|
||||
DoSplitCopy(sourceShardNodeId, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList);
|
||||
DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList);
|
||||
|
||||
// TODO(niupre) : Use Adaptive execution for creating multiple indexes parallely.
|
||||
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
|
||||
{
|
||||
ShardInterval *shardInterval = NULL;
|
||||
WorkerNode *workerPlacementNode = NULL;
|
||||
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
|
||||
workersForPlacementList)
|
||||
{
|
||||
List *indexCommandList = GetPostLoadTableCreationCommands(
|
||||
shardInterval->relationId,
|
||||
true /* includeIndexes */,
|
||||
true /* includeReplicaIdentity */);
|
||||
indexCommandList = WorkerApplyShardDDLCommandList(
|
||||
indexCommandList,
|
||||
shardInterval->shardId);
|
||||
|
||||
CreateObjectOnPlacement(indexCommandList, workerPlacementNode);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Perform Split Copy from source shard(s) to split children.
|
||||
* 'sourceShardNode' : Source shard worker node.
|
||||
* 'sourceColocatedShardIntervalList' : List of source shard intervals from shard group.
|
||||
* 'shardGroupSplitIntervalListList' : List of shard intervals for split children.
|
||||
* 'workersForPlacementList' : List of workers for split children placement.
|
||||
*/
|
||||
static void
|
||||
DoSplitCopy(uint32_t sourceShardNodeId, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList)
|
||||
DoSplitCopy(WorkerNode* sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList)
|
||||
{
|
||||
ShardInterval *sourceShardIntervalToCopy = NULL;
|
||||
List *splitShardIntervalList = NULL;
|
||||
|
||||
WorkerNode *workerNodeSource = NULL;
|
||||
WorkerNode *workerNodeDestination = NULL;
|
||||
int taskId = 0;
|
||||
List *splitCopyTaskList = NULL;
|
||||
forthree_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList,
|
||||
splitShardIntervalList, shardGroupSplitIntervalListList,
|
||||
workerNodeDestination, workersForPlacementList)
|
||||
forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList,
|
||||
splitShardIntervalList, shardGroupSplitIntervalListList)
|
||||
{
|
||||
StringInfo splitCopyUdfCommand = CreateSplitCopyCommand(sourceShardIntervalToCopy, splitShardIntervalList, workersForPlacementList);
|
||||
StringInfo splitCopyUdfCommand = CreateSplitCopyCommand(sourceShardIntervalToCopy, splitShardIntervalList, destinationWorkerNodesList);
|
||||
|
||||
Task *task = CreateBasicTask(
|
||||
sourceShardIntervalToCopy->shardId, /* jobId */
|
||||
|
@ -511,7 +521,7 @@ DoSplitCopy(uint32_t sourceShardNodeId, List *sourceColocatedShardIntervalList,
|
|||
splitCopyUdfCommand->data);
|
||||
|
||||
ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
|
||||
SetPlacementNodeMetadata(taskPlacement, workerNodeSource);
|
||||
SetPlacementNodeMetadata(taskPlacement, sourceShardNode);
|
||||
|
||||
task->taskPlacementList = list_make1(taskPlacement);
|
||||
|
||||
|
@ -519,12 +529,17 @@ DoSplitCopy(uint32_t sourceShardNodeId, List *sourceColocatedShardIntervalList,
|
|||
taskId++;
|
||||
}
|
||||
|
||||
// TODO(niupre) : Pass appropriate MaxParallelShards value from GUC.
|
||||
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, splitCopyTaskList, 1, NULL /* jobIdList */);
|
||||
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, splitCopyTaskList, MaxAdaptiveExecutorPoolSize, NULL /* jobIdList */);
|
||||
}
|
||||
|
||||
/*
|
||||
* Create Copy command for a given shard source shard to be copied to corresponding split children.
|
||||
* 'sourceShardSplitInterval' : Source shard interval to be copied.
|
||||
* 'splitChildrenShardINnerIntervalList' : List of shard intervals for split children.
|
||||
* 'destinationWorkerNodesList' : List of workers for split children placement.
|
||||
*/
|
||||
static StringInfo
|
||||
CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChildrenShardIntervalList, List* workersForPlacementList)
|
||||
CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChildrenShardIntervalList, List* destinationWorkerNodesList)
|
||||
{
|
||||
StringInfo splitCopyInfoArray = makeStringInfo();
|
||||
appendStringInfo(splitCopyInfoArray, "ARRAY[");
|
||||
|
@ -532,7 +547,7 @@ CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChild
|
|||
ShardInterval *splitChildShardInterval = NULL;
|
||||
bool addComma = false;
|
||||
WorkerNode *destinationWorkerNode = NULL;
|
||||
forboth_ptr(splitChildShardInterval, splitChildrenShardIntervalList, destinationWorkerNode, workersForPlacementList)
|
||||
forboth_ptr(splitChildShardInterval, splitChildrenShardIntervalList, destinationWorkerNode, destinationWorkerNodesList)
|
||||
{
|
||||
if(addComma)
|
||||
{
|
||||
|
@ -646,7 +661,7 @@ CreateSplitIntervalsForShard(ShardInterval *sourceShard,
|
|||
* InsertSplitOffShardMetadata inserts new shard and shard placement data into
|
||||
* catolog tables both the coordinator and mx nodes.
|
||||
*/
|
||||
static void
|
||||
void
|
||||
InsertSplitOffShardMetadata(List *splitOffShardList, List *sourcePlacementList)
|
||||
{
|
||||
List *syncedShardList = NIL;
|
||||
|
@ -699,7 +714,7 @@ InsertSplitOffShardMetadata(List *splitOffShardList, List *sourcePlacementList)
|
|||
* DropShardList drops shards and their metadata from both the coordinator and
|
||||
* mx nodes.
|
||||
*/
|
||||
static void
|
||||
void
|
||||
DropShardList(List *shardIntervalList)
|
||||
{
|
||||
ListCell *shardIntervalCell = NULL;
|
||||
|
@ -769,7 +784,7 @@ DropShardList(List *shardIntervalList)
|
|||
* executed over a single connection to prevent self-deadlocks. The latter
|
||||
* one can be executed in parallel if there are multiple replicas.
|
||||
*/
|
||||
static void
|
||||
void
|
||||
CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList)
|
||||
{
|
||||
ListCell *splitOffShardCell = NULL;
|
||||
|
@ -840,7 +855,7 @@ CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList)
|
|||
* commands in parallel. Finally, it sends commit messages to all connections
|
||||
* and close them.
|
||||
*/
|
||||
static void
|
||||
void
|
||||
ExecuteCommandListOnPlacements(List *commandList, List *placementList)
|
||||
{
|
||||
List *workerConnectionList = NIL;
|
||||
|
|
|
@ -12,10 +12,13 @@
|
|||
#include "postgres.h"
|
||||
#include "miscadmin.h"
|
||||
|
||||
#include "pg_version_compat.h"
|
||||
#include "pgtypes.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "nodes/pg_list.h"
|
||||
#include "distributed/utils/array_type.h"
|
||||
#include "utils/array.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/lsyscache.h"
|
||||
|
||||
|
||||
|
@ -133,14 +136,26 @@ extern List * TextArrayTypeToIntegerList(ArrayType *arrayObject, Oid datumTypeId
|
|||
switch (datumTypeId)
|
||||
{
|
||||
case INT2OID:
|
||||
list = lappend(list, pg_strtoint16(intAsStr));
|
||||
{
|
||||
int16_t *int16Value = palloc0(sizeof(int16_t));
|
||||
*int16Value = pg_strtoint16(intAsStr);
|
||||
list = lappend(list, (void*) int16Value);
|
||||
break;
|
||||
}
|
||||
case INT4OID:
|
||||
list = lappend(list, pg_strtoint32(intAsStr));
|
||||
{
|
||||
int32_t *int32Value = palloc0(sizeof(int32_t));
|
||||
*int32Value = pg_strtoint32(intAsStr);
|
||||
list = lappend(list, (void*) int32Value);
|
||||
break;
|
||||
}
|
||||
case INT8OID:
|
||||
list = lappend(list, pg_strtoint64(intAsStr));
|
||||
{
|
||||
int64_t *int64Value = palloc0(sizeof(int64_t));
|
||||
*int64Value = pg_strtoint64(intAsStr);
|
||||
list = lappend(list, (void*) int64Value);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("Unsupported datum type for array.")));
|
||||
|
|
|
@ -97,26 +97,6 @@ typedef struct ListCellAndListWrapper
|
|||
var2 ## CellDoNotUse = lnext_compat(l2, var2 ## CellDoNotUse) \
|
||||
)
|
||||
|
||||
/*
|
||||
* forthree_ptr -
|
||||
* a convenience macro which loops through three lists of pointers at the same
|
||||
* time, without needing a ListCell. It only needs three declared pointer
|
||||
* variables to store the pointer of each of the three cells in.
|
||||
*/
|
||||
#define forthree_ptr(var1, l1, var2, l2, var3, l3) \
|
||||
for (ListCell *(var1 ## CellDoNotUse) = list_head(l1), \
|
||||
*(var2 ## CellDoNotUse) = list_head(l2), \
|
||||
*(var3 ## CellDoNotUse) = list_head(l3); \
|
||||
(var1 ## CellDoNotUse) != NULL && \
|
||||
(var2 ## CellDoNotUse) != NULL && \
|
||||
(var3 ## CellDoNotUse) != NULL && \
|
||||
(((var1) = lfirst(var1 ## CellDoNotUse)) || true) && \
|
||||
(((var2) = lfirst(var2 ## CellDoNotUse)) || true) && \
|
||||
(((var3) = lfirst(var3 ## CellDoNotUse)) || true); \
|
||||
var1 ## CellDoNotUse = lnext_compat(l1, var1 ## CellDoNotUse), \
|
||||
var2 ## CellDoNotUse = lnext_compat(l2, var2 ## CellDoNotUse), \
|
||||
var3 ## CellDoNotUse = lnext_compat(l3, var3 ## CellDoNotUse) \
|
||||
)
|
||||
|
||||
/*
|
||||
* forboth_ptr_oid -
|
||||
|
|
|
@ -9,13 +9,13 @@
|
|||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef SHARDSPLIT_UTILS_H_
|
||||
#define SHARDSPLIT_UTILS_H_
|
||||
#ifndef SHARDSPLIT_H_
|
||||
#define SHARDSPLIT_H_
|
||||
|
||||
/* Split Modes supported by Shard Split API */
|
||||
typedef enum SplitMode
|
||||
{
|
||||
BLOCKING_SPLIT = 0,
|
||||
BLOCKING_SPLIT = 0
|
||||
} SplitMode;
|
||||
|
||||
/*
|
||||
|
@ -24,12 +24,13 @@ typedef enum SplitMode
|
|||
*/
|
||||
typedef enum SplitOperation
|
||||
{
|
||||
SHARD_SPLIT_API = 0
|
||||
SHARD_SPLIT_API = 0,
|
||||
ISOLATE_TENANT_TO_NEW_SHARD
|
||||
} SplitOperation;
|
||||
|
||||
/*
|
||||
* SplitShard API to split a given shard (or shard group) in blocking / non-blocking fashion
|
||||
* based on specified split points to a set of destination nodes.
|
||||
* SplitShard API to split a given shard (or shard group) using split mode and
|
||||
* specified split points to a set of destination nodes.
|
||||
*/
|
||||
extern void SplitShard(SplitMode splitMode,
|
||||
SplitOperation splitOperation,
|
||||
|
@ -37,4 +38,11 @@ extern void SplitShard(SplitMode splitMode,
|
|||
List *shardSplitPointsList,
|
||||
List *nodeIdsForPlacementList);
|
||||
|
||||
#endif /* SHARDSPLIT_UTILS_H_ */
|
||||
/* TODO(niupre): Make all these APIs private when all consumers (Example : ISOLATE_TENANT_TO_NEW_SHARD) directly call 'SplitShard' API. */
|
||||
extern void ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShard);
|
||||
extern void InsertSplitOffShardMetadata(List *splitOffShardList, List *sourcePlacementList);
|
||||
extern void DropShardList(List *shardIntervalList);
|
||||
extern void CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList);
|
||||
extern void ExecuteCommandListOnPlacements(List *commandList, List *placementList);
|
||||
|
||||
#endif /* SHARDSPLIT_H_ */
|
||||
|
|
Loading…
Reference in New Issue