Merge pull request #2769 from citusdata/refactor_create_dist_table

Refactor shard creation logic
pull/2774/head
Önder Kalacı 2019-06-19 16:07:38 +02:00 committed by GitHub
commit dabe1e0add
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 147 additions and 26 deletions

View File

@ -363,6 +363,24 @@ LogRemoteCommand(MultiConnection *connection, const char *command)
/* wrappers around libpq functions, with command logging support */ /* wrappers around libpq functions, with command logging support */
/*
* ExecuteCriticalRemoteCommandList calls ExecuteCriticalRemoteCommand for every
* command in the commandList.
*/
void
ExecuteCriticalRemoteCommandList(MultiConnection *connection, List *commandList)
{
ListCell *commandCell = NULL;
foreach(commandCell, commandList)
{
char *command = (char *) lfirst(commandCell);
ExecuteCriticalRemoteCommand(connection, command);
}
}
/* /*
* ExecuteCriticalRemoteCommand executes a remote command that is critical * ExecuteCriticalRemoteCommand executes a remote command that is critical
* to the transaction. If the command fails then the transaction aborts. * to the transaction. If the command fails then the transaction aborts.

View File

@ -58,6 +58,7 @@
/* Local functions forward declarations */ /* Local functions forward declarations */
static List * RelationShardListForShardCreate(ShardInterval *shardInterval);
static bool WorkerShardStats(ShardPlacement *placement, Oid relationId, static bool WorkerShardStats(ShardPlacement *placement, Oid relationId,
char *shardName, uint64 *shardSize, char *shardName, uint64 *shardSize,
text **shardMinValue, text **shardMaxValue); text **shardMinValue, text **shardMaxValue);
@ -403,6 +404,7 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
MultiConnection *connection = MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort, GetNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort,
relationOwner, NULL); relationOwner, NULL);
List *commandList = NIL;
if (PQstatus(connection->pgConn) != CONNECTION_OK) if (PQstatus(connection->pgConn) != CONNECTION_OK)
{ {
@ -412,8 +414,11 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
continue; continue;
} }
WorkerCreateShard(relationId, shardIndex, shardId, ddlCommandList, commandList = WorkerCreateShardCommandList(relationId, shardIndex, shardId,
foreignConstraintCommandList, connection); ddlCommandList,
foreignConstraintCommandList);
ExecuteCriticalRemoteCommandList(connection, commandList);
InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardSize, InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardSize,
nodeGroupId); nodeGroupId);
@ -529,8 +534,10 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell); ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell);
uint64 shardId = shardPlacement->shardId; uint64 shardId = shardPlacement->shardId;
ShardInterval *shardInterval = LoadShardInterval(shardId); ShardInterval *shardInterval = LoadShardInterval(shardId);
List *relationShardList = NIL;
MultiConnection *connection = NULL; MultiConnection *connection = NULL;
int shardIndex = -1; int shardIndex = -1;
List *commandList = NIL;
if (colocatedShard) if (colocatedShard)
{ {
@ -544,18 +551,10 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
*/ */
if (ShouldRecordRelationAccess() && useExclusiveConnection && partitionTable) if (ShouldRecordRelationAccess() && useExclusiveConnection && partitionTable)
{ {
RelationShard *parentRelationShard = CitusMakeNode(RelationShard);
RelationShard *partitionRelationShard = CitusMakeNode(RelationShard);
List *relationShardList = NIL;
List *placementAccessList = NIL; List *placementAccessList = NIL;
parentRelationShard->relationId = PartitionParentOid(distributedRelationId); relationShardList = RelationShardListForShardCreate(shardInterval);
parentRelationShard->shardId =
ColocatedShardIdInRelation(parentRelationShard->relationId, shardIndex);
partitionRelationShard->relationId = distributedRelationId;
partitionRelationShard->shardId = shardId;
relationShardList = list_make2(parentRelationShard, partitionRelationShard);
placementAccessList = BuildPlacementDDLList(shardPlacement->groupId, placementAccessList = BuildPlacementDDLList(shardPlacement->groupId,
relationShardList); relationShardList);
@ -577,8 +576,12 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
RemoteTransactionBeginIfNecessary(connection); RemoteTransactionBeginIfNecessary(connection);
MarkRemoteTransactionCritical(connection); MarkRemoteTransactionCritical(connection);
WorkerCreateShard(distributedRelationId, shardIndex, shardId, commandList = WorkerCreateShardCommandList(distributedRelationId, shardIndex,
ddlCommandList, foreignConstraintCommandList, connection); shardId,
ddlCommandList,
foreignConstraintCommandList);
ExecuteCriticalRemoteCommandList(connection, commandList);
} }
/* /*
@ -595,14 +598,110 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
/* /*
* WorkerCreateShard applies DDL commands for the given shardId to create the * RelationShardListForShardCreate gets a shard interval and returns the placement
* shard on the worker node. Commands are sent to the worker node over the * accesses that would happen when a placement of the shard interval is created.
* given connection.
*/ */
void static List *
WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlCommandList, RelationShardListForShardCreate(ShardInterval *shardInterval)
List *foreignConstraintCommandList, MultiConnection *connection)
{ {
List *relationShardList = NIL;
RelationShard *relationShard = NULL;
Oid relationId = shardInterval->relationId;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
List *referencedRelationList = cacheEntry->referencedRelationsViaForeignKey;
List *referencingRelationList = cacheEntry->referencingRelationsViaForeignKey;
List *allForeignKeyRelations =
list_concat_unique_oid(referencedRelationList, referencingRelationList);
int shardIndex = -1;
ListCell *fkeyRelationIdCell = NULL;
/* record the placement access of the shard itself */
relationShard = CitusMakeNode(RelationShard);
relationShard->relationId = relationId;
relationShard->shardId = shardInterval->shardId;
relationShardList = list_make1(relationShard);
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
cacheEntry->colocationId != INVALID_COLOCATION_ID)
{
shardIndex = ShardIndex(shardInterval);
}
/* all foregin key constraint relations */
foreach(fkeyRelationIdCell, allForeignKeyRelations)
{
Oid fkeyRelationid = lfirst_oid(fkeyRelationIdCell);
RelationShard *fkeyRelationShard = NULL;
uint64 fkeyShardId = INVALID_SHARD_ID;
if (!IsDistributedTable(fkeyRelationid))
{
/* we're not interested in local tables */
continue;
}
if (PartitionMethod(fkeyRelationid) == DISTRIBUTE_BY_NONE)
{
fkeyShardId = GetFirstShardId(fkeyRelationid);
}
else if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
PartitionMethod(fkeyRelationid) == DISTRIBUTE_BY_HASH)
{
/* hash distributed tables should be colocated to have fkey */
Assert(TableColocationId(fkeyRelationid) == cacheEntry->colocationId);
fkeyShardId =
ColocatedShardIdInRelation(fkeyRelationid, shardIndex);
}
else
{
/*
* We currently do not support foreign keys from/to local tables or
* non-colocated tables when creating shards. Also note that shard
* creation via shard moves doesn't happen in a transaction block,
* so not relevant here.
*/
continue;
}
fkeyRelationShard = CitusMakeNode(RelationShard);
fkeyRelationShard->relationId = fkeyRelationid;
fkeyRelationShard->shardId = fkeyShardId;
relationShardList = lappend(relationShardList, fkeyRelationShard);
}
/* if partitioned table, make sure to record the parent table */
if (PartitionTable(relationId))
{
RelationShard *parentRelationShard = CitusMakeNode(RelationShard);
/* partitioned tables are always co-located */
Assert(shardIndex != -1);
parentRelationShard->relationId = PartitionParentOid(relationId);
parentRelationShard->shardId =
ColocatedShardIdInRelation(parentRelationShard->relationId, shardIndex);
relationShardList = lappend(relationShardList, parentRelationShard);
}
return relationShardList;
}
/*
* WorkerCreateShardCommandList returns a list of DDL commands for the given
* shardId to create the shard on the worker node.
*/
List *
WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId,
List *ddlCommandList,
List *foreignConstraintCommandList)
{
List *commandList = NIL;
Oid schemaId = get_rel_namespace(relationId); Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId); char *schemaName = get_namespace_name(schemaId);
char *escapedSchemaName = quote_literal_cstr(schemaName); char *escapedSchemaName = quote_literal_cstr(schemaName);
@ -627,7 +726,7 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma
escapedDDLCommand); escapedDDLCommand);
} }
ExecuteCriticalRemoteCommand(connection, applyDDLCommand->data); commandList = lappend(commandList, applyDDLCommand->data);
} }
foreach(foreignConstraintCommandCell, foreignConstraintCommandList) foreach(foreignConstraintCommandCell, foreignConstraintCommandList)
@ -683,8 +782,7 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma
WORKER_APPLY_INTER_SHARD_DDL_COMMAND, shardId, escapedSchemaName, WORKER_APPLY_INTER_SHARD_DDL_COMMAND, shardId, escapedSchemaName,
referencedShardId, escapedReferencedSchemaName, escapedCommand); referencedShardId, escapedReferencedSchemaName, escapedCommand);
commandList = lappend(commandList, applyForeignConstraintCommand->data);
ExecuteCriticalRemoteCommand(connection, applyForeignConstraintCommand->data);
} }
/* /*
@ -695,8 +793,11 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma
{ {
ShardInterval *shardInterval = LoadShardInterval(shardId); ShardInterval *shardInterval = LoadShardInterval(shardId);
char *attachPartitionCommand = GenerateAttachShardPartitionCommand(shardInterval); char *attachPartitionCommand = GenerateAttachShardPartitionCommand(shardInterval);
ExecuteCriticalRemoteCommand(connection, attachPartitionCommand);
commandList = lappend(commandList, attachPartitionCommand);
} }
return commandList;
} }

View File

@ -125,9 +125,9 @@ extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shard
extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId,
bool useExclusiveConnections); bool useExclusiveConnections);
extern void CreateReferenceTableShard(Oid distributedTableId); extern void CreateReferenceTableShard(Oid distributedTableId);
extern void WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, extern List * WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId,
List *ddlCommandList, List *foreignConstraintCommandList, List *ddlCommandList,
MultiConnection *connection); List *foreignConstraintCommandList);
extern Oid ForeignConstraintGetReferencedTableId(char *queryString); extern Oid ForeignConstraintGetReferencedTableId(char *queryString);
extern void CheckHashPartitionedTable(Oid distributedTableId); extern void CheckHashPartitionedTable(Oid distributedTableId);
extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName, extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName,

View File

@ -39,6 +39,8 @@ extern char * pchomp(const char *in);
extern void LogRemoteCommand(MultiConnection *connection, const char *command); extern void LogRemoteCommand(MultiConnection *connection, const char *command);
/* wrappers around libpq functions, with command logging support */ /* wrappers around libpq functions, with command logging support */
extern void ExecuteCriticalRemoteCommandList(MultiConnection *connection,
List *commandList);
extern void ExecuteCriticalRemoteCommand(MultiConnection *connection, extern void ExecuteCriticalRemoteCommand(MultiConnection *connection,
const char *command); const char *command);
extern int ExecuteOptionalRemoteCommand(MultiConnection *connection, extern int ExecuteOptionalRemoteCommand(MultiConnection *connection,