From 4fd1fcbbefe5c64b2a760a78a59ef3b4bd3f8cf1 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 17 Jun 2019 16:12:51 +0200 Subject: [PATCH] Refactor shard creation logic This is a preperation for the new executor, where creating shards would go through the executor. So, explicitly generate the commands for further processing. --- .../distributed/connection/remote_commands.c | 18 ++++++++ .../master/master_stage_protocol.c | 41 ++++++++++++------- src/include/distributed/master_protocol.h | 6 +-- src/include/distributed/remote_commands.h | 2 + 4 files changed, 50 insertions(+), 17 deletions(-) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 21d8bd56e..a8b13fa31 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -363,6 +363,24 @@ LogRemoteCommand(MultiConnection *connection, const char *command) /* wrappers around libpq functions, with command logging support */ +/* + * ExecuteCriticalRemoteCommandList calls ExecuteCriticalRemoteCommand for every + * command in the commandList. + */ +void +ExecuteCriticalRemoteCommandList(MultiConnection *connection, List *commandList) +{ + ListCell *commandCell = NULL; + + foreach(commandCell, commandList) + { + char *command = (char *) lfirst(commandCell); + + ExecuteCriticalRemoteCommand(connection, command); + } +} + + /* * ExecuteCriticalRemoteCommand executes a remote command that is critical * to the transaction. If the command fails then the transaction aborts. diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 95a69a62a..a70110d5c 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -403,6 +403,7 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId, MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort, relationOwner, NULL); + List *commandList = NIL; if (PQstatus(connection->pgConn) != CONNECTION_OK) { @@ -412,8 +413,11 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId, continue; } - WorkerCreateShard(relationId, shardIndex, shardId, ddlCommandList, - foreignConstraintCommandList, connection); + commandList = WorkerCreateShardCommandList(relationId, shardIndex, shardId, + ddlCommandList, + foreignConstraintCommandList); + + ExecuteCriticalRemoteCommandList(connection, commandList); InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardSize, nodeGroupId); @@ -531,6 +535,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, ShardInterval *shardInterval = LoadShardInterval(shardId); MultiConnection *connection = NULL; int shardIndex = -1; + List *commandList = NIL; if (colocatedShard) { @@ -577,8 +582,12 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, RemoteTransactionBeginIfNecessary(connection); MarkRemoteTransactionCritical(connection); - WorkerCreateShard(distributedRelationId, shardIndex, shardId, - ddlCommandList, foreignConstraintCommandList, connection); + commandList = WorkerCreateShardCommandList(distributedRelationId, shardIndex, + shardId, + ddlCommandList, + foreignConstraintCommandList); + + ExecuteCriticalRemoteCommandList(connection, commandList); } /* @@ -595,14 +604,16 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, /* - * WorkerCreateShard applies DDL commands for the given shardId to create the - * shard on the worker node. Commands are sent to the worker node over the - * given connection. + * WorkerCreateShardCommandList returns a list of DDL commands for the given + * shardId to create the shard on the worker node. + * */ -void -WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlCommandList, - List *foreignConstraintCommandList, MultiConnection *connection) +List * +WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId, + List *ddlCommandList, + List *foreignConstraintCommandList) { + List *commandList = NIL; Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); char *escapedSchemaName = quote_literal_cstr(schemaName); @@ -627,7 +638,7 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma escapedDDLCommand); } - ExecuteCriticalRemoteCommand(connection, applyDDLCommand->data); + commandList = lappend(commandList, applyDDLCommand->data); } foreach(foreignConstraintCommandCell, foreignConstraintCommandList) @@ -683,8 +694,7 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma WORKER_APPLY_INTER_SHARD_DDL_COMMAND, shardId, escapedSchemaName, referencedShardId, escapedReferencedSchemaName, escapedCommand); - - ExecuteCriticalRemoteCommand(connection, applyForeignConstraintCommand->data); + commandList = lappend(commandList, applyForeignConstraintCommand->data); } /* @@ -695,8 +705,11 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma { ShardInterval *shardInterval = LoadShardInterval(shardId); char *attachPartitionCommand = GenerateAttachShardPartitionCommand(shardInterval); - ExecuteCriticalRemoteCommand(connection, attachPartitionCommand); + + commandList = lappend(commandList, attachPartitionCommand); } + + return commandList; } diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 06c68e4ea..155aab574 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -125,9 +125,9 @@ extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shard extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool useExclusiveConnections); extern void CreateReferenceTableShard(Oid distributedTableId); -extern void WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, - List *ddlCommandList, List *foreignConstraintCommandList, - MultiConnection *connection); +extern List * WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId, + List *ddlCommandList, + List *foreignConstraintCommandList); extern Oid ForeignConstraintGetReferencedTableId(char *queryString); extern void CheckHashPartitionedTable(Oid distributedTableId); extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName, diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 5a55a25a1..0321feb2d 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -39,6 +39,8 @@ extern char * pchomp(const char *in); extern void LogRemoteCommand(MultiConnection *connection, const char *command); /* wrappers around libpq functions, with command logging support */ +extern void ExecuteCriticalRemoteCommandList(MultiConnection *connection, + List *commandList); extern void ExecuteCriticalRemoteCommand(MultiConnection *connection, const char *command); extern int ExecuteOptionalRemoteCommand(MultiConnection *connection,