Refactor shard creation logic

This is a preperation for the new executor, where creating shards
would go through the executor. So, explicitly generate the commands
for further processing.
pull/2769/head
Onder Kalaci 2019-06-17 16:12:51 +02:00
parent 96d9847aa4
commit 4fd1fcbbef
4 changed files with 50 additions and 17 deletions

View File

@ -363,6 +363,24 @@ LogRemoteCommand(MultiConnection *connection, const char *command)
/* wrappers around libpq functions, with command logging support */
/*
* ExecuteCriticalRemoteCommandList calls ExecuteCriticalRemoteCommand for every
* command in the commandList.
*/
void
ExecuteCriticalRemoteCommandList(MultiConnection *connection, List *commandList)
{
ListCell *commandCell = NULL;
foreach(commandCell, commandList)
{
char *command = (char *) lfirst(commandCell);
ExecuteCriticalRemoteCommand(connection, command);
}
}
/*
* ExecuteCriticalRemoteCommand executes a remote command that is critical
* to the transaction. If the command fails then the transaction aborts.

View File

@ -403,6 +403,7 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort,
relationOwner, NULL);
List *commandList = NIL;
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
@ -412,8 +413,11 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
continue;
}
WorkerCreateShard(relationId, shardIndex, shardId, ddlCommandList,
foreignConstraintCommandList, connection);
commandList = WorkerCreateShardCommandList(relationId, shardIndex, shardId,
ddlCommandList,
foreignConstraintCommandList);
ExecuteCriticalRemoteCommandList(connection, commandList);
InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardSize,
nodeGroupId);
@ -531,6 +535,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
ShardInterval *shardInterval = LoadShardInterval(shardId);
MultiConnection *connection = NULL;
int shardIndex = -1;
List *commandList = NIL;
if (colocatedShard)
{
@ -577,8 +582,12 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
RemoteTransactionBeginIfNecessary(connection);
MarkRemoteTransactionCritical(connection);
WorkerCreateShard(distributedRelationId, shardIndex, shardId,
ddlCommandList, foreignConstraintCommandList, connection);
commandList = WorkerCreateShardCommandList(distributedRelationId, shardIndex,
shardId,
ddlCommandList,
foreignConstraintCommandList);
ExecuteCriticalRemoteCommandList(connection, commandList);
}
/*
@ -595,14 +604,16 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
/*
* WorkerCreateShard applies DDL commands for the given shardId to create the
* shard on the worker node. Commands are sent to the worker node over the
* given connection.
* WorkerCreateShardCommandList returns a list of DDL commands for the given
* shardId to create the shard on the worker node.
*
*/
void
WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlCommandList,
List *foreignConstraintCommandList, MultiConnection *connection)
List *
WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId,
List *ddlCommandList,
List *foreignConstraintCommandList)
{
List *commandList = NIL;
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
char *escapedSchemaName = quote_literal_cstr(schemaName);
@ -627,7 +638,7 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma
escapedDDLCommand);
}
ExecuteCriticalRemoteCommand(connection, applyDDLCommand->data);
commandList = lappend(commandList, applyDDLCommand->data);
}
foreach(foreignConstraintCommandCell, foreignConstraintCommandList)
@ -683,8 +694,7 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma
WORKER_APPLY_INTER_SHARD_DDL_COMMAND, shardId, escapedSchemaName,
referencedShardId, escapedReferencedSchemaName, escapedCommand);
ExecuteCriticalRemoteCommand(connection, applyForeignConstraintCommand->data);
commandList = lappend(commandList, applyForeignConstraintCommand->data);
}
/*
@ -695,8 +705,11 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma
{
ShardInterval *shardInterval = LoadShardInterval(shardId);
char *attachPartitionCommand = GenerateAttachShardPartitionCommand(shardInterval);
ExecuteCriticalRemoteCommand(connection, attachPartitionCommand);
commandList = lappend(commandList, attachPartitionCommand);
}
return commandList;
}

View File

@ -125,9 +125,9 @@ extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shard
extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId,
bool useExclusiveConnections);
extern void CreateReferenceTableShard(Oid distributedTableId);
extern void WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId,
List *ddlCommandList, List *foreignConstraintCommandList,
MultiConnection *connection);
extern List * WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId,
List *ddlCommandList,
List *foreignConstraintCommandList);
extern Oid ForeignConstraintGetReferencedTableId(char *queryString);
extern void CheckHashPartitionedTable(Oid distributedTableId);
extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName,

View File

@ -39,6 +39,8 @@ extern char * pchomp(const char *in);
extern void LogRemoteCommand(MultiConnection *connection, const char *command);
/* wrappers around libpq functions, with command logging support */
extern void ExecuteCriticalRemoteCommandList(MultiConnection *connection,
List *commandList);
extern void ExecuteCriticalRemoteCommand(MultiConnection *connection,
const char *command);
extern int ExecuteOptionalRemoteCommand(MultiConnection *connection,