From 4fd1fcbbefe5c64b2a760a78a59ef3b4bd3f8cf1 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 17 Jun 2019 16:12:51 +0200 Subject: [PATCH 1/3] Refactor shard creation logic This is a preperation for the new executor, where creating shards would go through the executor. So, explicitly generate the commands for further processing. --- .../distributed/connection/remote_commands.c | 18 ++++++++ .../master/master_stage_protocol.c | 41 ++++++++++++------- src/include/distributed/master_protocol.h | 6 +-- src/include/distributed/remote_commands.h | 2 + 4 files changed, 50 insertions(+), 17 deletions(-) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 21d8bd56e..a8b13fa31 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -363,6 +363,24 @@ LogRemoteCommand(MultiConnection *connection, const char *command) /* wrappers around libpq functions, with command logging support */ +/* + * ExecuteCriticalRemoteCommandList calls ExecuteCriticalRemoteCommand for every + * command in the commandList. + */ +void +ExecuteCriticalRemoteCommandList(MultiConnection *connection, List *commandList) +{ + ListCell *commandCell = NULL; + + foreach(commandCell, commandList) + { + char *command = (char *) lfirst(commandCell); + + ExecuteCriticalRemoteCommand(connection, command); + } +} + + /* * ExecuteCriticalRemoteCommand executes a remote command that is critical * to the transaction. If the command fails then the transaction aborts. diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 95a69a62a..a70110d5c 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -403,6 +403,7 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId, MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort, relationOwner, NULL); + List *commandList = NIL; if (PQstatus(connection->pgConn) != CONNECTION_OK) { @@ -412,8 +413,11 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId, continue; } - WorkerCreateShard(relationId, shardIndex, shardId, ddlCommandList, - foreignConstraintCommandList, connection); + commandList = WorkerCreateShardCommandList(relationId, shardIndex, shardId, + ddlCommandList, + foreignConstraintCommandList); + + ExecuteCriticalRemoteCommandList(connection, commandList); InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardSize, nodeGroupId); @@ -531,6 +535,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, ShardInterval *shardInterval = LoadShardInterval(shardId); MultiConnection *connection = NULL; int shardIndex = -1; + List *commandList = NIL; if (colocatedShard) { @@ -577,8 +582,12 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, RemoteTransactionBeginIfNecessary(connection); MarkRemoteTransactionCritical(connection); - WorkerCreateShard(distributedRelationId, shardIndex, shardId, - ddlCommandList, foreignConstraintCommandList, connection); + commandList = WorkerCreateShardCommandList(distributedRelationId, shardIndex, + shardId, + ddlCommandList, + foreignConstraintCommandList); + + ExecuteCriticalRemoteCommandList(connection, commandList); } /* @@ -595,14 +604,16 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, /* - * WorkerCreateShard applies DDL commands for the given shardId to create the - * shard on the worker node. Commands are sent to the worker node over the - * given connection. + * WorkerCreateShardCommandList returns a list of DDL commands for the given + * shardId to create the shard on the worker node. + * */ -void -WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlCommandList, - List *foreignConstraintCommandList, MultiConnection *connection) +List * +WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId, + List *ddlCommandList, + List *foreignConstraintCommandList) { + List *commandList = NIL; Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); char *escapedSchemaName = quote_literal_cstr(schemaName); @@ -627,7 +638,7 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma escapedDDLCommand); } - ExecuteCriticalRemoteCommand(connection, applyDDLCommand->data); + commandList = lappend(commandList, applyDDLCommand->data); } foreach(foreignConstraintCommandCell, foreignConstraintCommandList) @@ -683,8 +694,7 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma WORKER_APPLY_INTER_SHARD_DDL_COMMAND, shardId, escapedSchemaName, referencedShardId, escapedReferencedSchemaName, escapedCommand); - - ExecuteCriticalRemoteCommand(connection, applyForeignConstraintCommand->data); + commandList = lappend(commandList, applyForeignConstraintCommand->data); } /* @@ -695,8 +705,11 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma { ShardInterval *shardInterval = LoadShardInterval(shardId); char *attachPartitionCommand = GenerateAttachShardPartitionCommand(shardInterval); - ExecuteCriticalRemoteCommand(connection, attachPartitionCommand); + + commandList = lappend(commandList, attachPartitionCommand); } + + return commandList; } diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 06c68e4ea..155aab574 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -125,9 +125,9 @@ extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shard extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool useExclusiveConnections); extern void CreateReferenceTableShard(Oid distributedTableId); -extern void WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, - List *ddlCommandList, List *foreignConstraintCommandList, - MultiConnection *connection); +extern List * WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId, + List *ddlCommandList, + List *foreignConstraintCommandList); extern Oid ForeignConstraintGetReferencedTableId(char *queryString); extern void CheckHashPartitionedTable(Oid distributedTableId); extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName, diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 5a55a25a1..0321feb2d 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -39,6 +39,8 @@ extern char * pchomp(const char *in); extern void LogRemoteCommand(MultiConnection *connection, const char *command); /* wrappers around libpq functions, with command logging support */ +extern void ExecuteCriticalRemoteCommandList(MultiConnection *connection, + List *commandList); extern void ExecuteCriticalRemoteCommand(MultiConnection *connection, const char *command); extern int ExecuteOptionalRemoteCommand(MultiConnection *connection, From 3a04374a9e5f572863c87cf5587aea4484e3c20f Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 18 Jun 2019 11:57:21 +0200 Subject: [PATCH 2/3] Refactor relation shard list creation during placement creation This change is to make further refactoring even simpler such as using the executor for shard creation. --- .../master/master_stage_protocol.c | 111 ++++++++++++++++-- 1 file changed, 101 insertions(+), 10 deletions(-) diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index a70110d5c..f32b86bbc 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -58,6 +58,7 @@ /* Local functions forward declarations */ +static List * RelationShardListForShardCreate(ShardInterval *shardInterval); static bool WorkerShardStats(ShardPlacement *placement, Oid relationId, char *shardName, uint64 *shardSize, text **shardMinValue, text **shardMaxValue); @@ -533,6 +534,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell); uint64 shardId = shardPlacement->shardId; ShardInterval *shardInterval = LoadShardInterval(shardId); + List *relationShardList = NIL; MultiConnection *connection = NULL; int shardIndex = -1; List *commandList = NIL; @@ -549,18 +551,14 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, */ if (ShouldRecordRelationAccess() && useExclusiveConnection && partitionTable) { - RelationShard *parentRelationShard = CitusMakeNode(RelationShard); - RelationShard *partitionRelationShard = CitusMakeNode(RelationShard); - List *relationShardList = NIL; List *placementAccessList = NIL; - parentRelationShard->relationId = PartitionParentOid(distributedRelationId); - parentRelationShard->shardId = - ColocatedShardIdInRelation(parentRelationShard->relationId, shardIndex); - partitionRelationShard->relationId = distributedRelationId; - partitionRelationShard->shardId = shardId; + /* we only need to calculate this once per shardInterval, not placement */ + if (relationShardList == NIL) + { + relationShardList = RelationShardListForShardCreate(shardInterval); + } - relationShardList = list_make2(parentRelationShard, partitionRelationShard); placementAccessList = BuildPlacementDDLList(shardPlacement->groupId, relationShardList); @@ -603,10 +601,103 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, } +/* + * RelationShardListForShardCreate gets a shard interval and returns the placement + * accesses that would happen when a placement of the shard interval is created. + */ +static List * +RelationShardListForShardCreate(ShardInterval *shardInterval) +{ + List *relationShardList = NIL; + RelationShard *relationShard = NULL; + Oid relationId = shardInterval->relationId; + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId); + List *referencedRelationList = cacheEntry->referencedRelationsViaForeignKey; + List *referencingRelationList = cacheEntry->referencingRelationsViaForeignKey; + List *allForeignKeyRelations = + list_concat_unique_oid(referencedRelationList, referencingRelationList); + int shardIndex = -1; + + /* record the placement access of the shard itself */ + relationShard = CitusMakeNode(RelationShard); + relationShard->relationId = relationId; + relationShard->shardId = shardInterval->shardId; + relationShardList = list_make1(relationShard); + + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH && + cacheEntry->colocationId != INVALID_COLOCATION_ID) + { + shardIndex = ShardIndex(shardInterval); + } + + /* all foregin constraint relations */ + if (allForeignKeyRelations != NIL) + { + ListCell *relationIdCell = NULL; + + foreach(relationIdCell, allForeignKeyRelations) + { + Oid fkeyRelationid = lfirst_oid(relationIdCell); + RelationShard *fkeyRelationShard = NULL; + uint64 fkeyShardId = INVALID_SHARD_ID; + + if (!IsDistributedTable(fkeyRelationid)) + { + /* we're not interested in local tables */ + continue; + } + + if (PartitionMethod(fkeyRelationid) == DISTRIBUTE_BY_NONE) + { + fkeyShardId = GetFirstShardId(fkeyRelationid); + } + else if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH && + PartitionMethod(fkeyRelationid) == DISTRIBUTE_BY_HASH) + { + /* hash distributed tables should be colocated to have fkey */ + Assert(TableColocationId(fkeyRelationid) == cacheEntry->colocationId); + + fkeyShardId = + ColocatedShardIdInRelation(fkeyRelationid, shardIndex); + } + else + { + /* + * We currently do not support foreign keys from/to local tables or + * non-colocated tables when creating shards. Also note that shard + * creation via shard moves doesn't happen in a transaction block, + * so not relevant here. + */ + continue; + } + + fkeyRelationShard = CitusMakeNode(RelationShard); + fkeyRelationShard->relationId = fkeyRelationid; + fkeyRelationShard->shardId = fkeyShardId; + + relationShardList = lappend(relationShardList, fkeyRelationShard); + } + } + + /* if partitioned table, make sure to record the parent table */ + if (PartitionTable(relationId)) + { + RelationShard *parentRelationShard = CitusMakeNode(RelationShard); + + parentRelationShard->relationId = PartitionParentOid(relationId); + parentRelationShard->shardId = + ColocatedShardIdInRelation(parentRelationShard->relationId, shardIndex); + + relationShardList = lappend(relationShardList, parentRelationShard); + } + + return relationShardList; +} + + /* * WorkerCreateShardCommandList returns a list of DDL commands for the given * shardId to create the shard on the worker node. - * */ List * WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId, From 2b0c4accdac051ee2986f2072c002c75b5c3d568 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 19 Jun 2019 09:19:28 +0200 Subject: [PATCH 3/3] Apply feedback --- .../master/master_stage_protocol.c | 95 +++++++++---------- 1 file changed, 46 insertions(+), 49 deletions(-) diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index f32b86bbc..df820475e 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -553,11 +553,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, { List *placementAccessList = NIL; - /* we only need to calculate this once per shardInterval, not placement */ - if (relationShardList == NIL) - { - relationShardList = RelationShardListForShardCreate(shardInterval); - } + relationShardList = RelationShardListForShardCreate(shardInterval); placementAccessList = BuildPlacementDDLList(shardPlacement->groupId, relationShardList); @@ -617,6 +613,7 @@ RelationShardListForShardCreate(ShardInterval *shardInterval) List *allForeignKeyRelations = list_concat_unique_oid(referencedRelationList, referencingRelationList); int shardIndex = -1; + ListCell *fkeyRelationIdCell = NULL; /* record the placement access of the shard itself */ relationShard = CitusMakeNode(RelationShard); @@ -630,60 +627,60 @@ RelationShardListForShardCreate(ShardInterval *shardInterval) shardIndex = ShardIndex(shardInterval); } - /* all foregin constraint relations */ - if (allForeignKeyRelations != NIL) + + /* all foregin key constraint relations */ + foreach(fkeyRelationIdCell, allForeignKeyRelations) { - ListCell *relationIdCell = NULL; + Oid fkeyRelationid = lfirst_oid(fkeyRelationIdCell); + RelationShard *fkeyRelationShard = NULL; + uint64 fkeyShardId = INVALID_SHARD_ID; - foreach(relationIdCell, allForeignKeyRelations) + if (!IsDistributedTable(fkeyRelationid)) { - Oid fkeyRelationid = lfirst_oid(relationIdCell); - RelationShard *fkeyRelationShard = NULL; - uint64 fkeyShardId = INVALID_SHARD_ID; - - if (!IsDistributedTable(fkeyRelationid)) - { - /* we're not interested in local tables */ - continue; - } - - if (PartitionMethod(fkeyRelationid) == DISTRIBUTE_BY_NONE) - { - fkeyShardId = GetFirstShardId(fkeyRelationid); - } - else if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH && - PartitionMethod(fkeyRelationid) == DISTRIBUTE_BY_HASH) - { - /* hash distributed tables should be colocated to have fkey */ - Assert(TableColocationId(fkeyRelationid) == cacheEntry->colocationId); - - fkeyShardId = - ColocatedShardIdInRelation(fkeyRelationid, shardIndex); - } - else - { - /* - * We currently do not support foreign keys from/to local tables or - * non-colocated tables when creating shards. Also note that shard - * creation via shard moves doesn't happen in a transaction block, - * so not relevant here. - */ - continue; - } - - fkeyRelationShard = CitusMakeNode(RelationShard); - fkeyRelationShard->relationId = fkeyRelationid; - fkeyRelationShard->shardId = fkeyShardId; - - relationShardList = lappend(relationShardList, fkeyRelationShard); + /* we're not interested in local tables */ + continue; } + + if (PartitionMethod(fkeyRelationid) == DISTRIBUTE_BY_NONE) + { + fkeyShardId = GetFirstShardId(fkeyRelationid); + } + else if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH && + PartitionMethod(fkeyRelationid) == DISTRIBUTE_BY_HASH) + { + /* hash distributed tables should be colocated to have fkey */ + Assert(TableColocationId(fkeyRelationid) == cacheEntry->colocationId); + + fkeyShardId = + ColocatedShardIdInRelation(fkeyRelationid, shardIndex); + } + else + { + /* + * We currently do not support foreign keys from/to local tables or + * non-colocated tables when creating shards. Also note that shard + * creation via shard moves doesn't happen in a transaction block, + * so not relevant here. + */ + continue; + } + + fkeyRelationShard = CitusMakeNode(RelationShard); + fkeyRelationShard->relationId = fkeyRelationid; + fkeyRelationShard->shardId = fkeyShardId; + + relationShardList = lappend(relationShardList, fkeyRelationShard); } + /* if partitioned table, make sure to record the parent table */ if (PartitionTable(relationId)) { RelationShard *parentRelationShard = CitusMakeNode(RelationShard); + /* partitioned tables are always co-located */ + Assert(shardIndex != -1); + parentRelationShard->relationId = PartitionParentOid(relationId); parentRelationShard->shardId = ColocatedShardIdInRelation(parentRelationShard->relationId, shardIndex); @@ -697,7 +694,7 @@ RelationShardListForShardCreate(ShardInterval *shardInterval) /* * WorkerCreateShardCommandList returns a list of DDL commands for the given - * shardId to create the shard on the worker node. + * shardId to create the shard on the worker node. */ List * WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId,