diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 21d8bd56e..a8b13fa31 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -363,6 +363,24 @@ LogRemoteCommand(MultiConnection *connection, const char *command) /* wrappers around libpq functions, with command logging support */ +/* + * ExecuteCriticalRemoteCommandList calls ExecuteCriticalRemoteCommand for every + * command in the commandList. + */ +void +ExecuteCriticalRemoteCommandList(MultiConnection *connection, List *commandList) +{ + ListCell *commandCell = NULL; + + foreach(commandCell, commandList) + { + char *command = (char *) lfirst(commandCell); + + ExecuteCriticalRemoteCommand(connection, command); + } +} + + /* * ExecuteCriticalRemoteCommand executes a remote command that is critical * to the transaction. If the command fails then the transaction aborts. diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 95a69a62a..df820475e 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -58,6 +58,7 @@ /* Local functions forward declarations */ +static List * RelationShardListForShardCreate(ShardInterval *shardInterval); static bool WorkerShardStats(ShardPlacement *placement, Oid relationId, char *shardName, uint64 *shardSize, text **shardMinValue, text **shardMaxValue); @@ -403,6 +404,7 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId, MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort, relationOwner, NULL); + List *commandList = NIL; if (PQstatus(connection->pgConn) != CONNECTION_OK) { @@ -412,8 +414,11 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId, continue; } - WorkerCreateShard(relationId, shardIndex, shardId, ddlCommandList, - foreignConstraintCommandList, connection); + commandList = WorkerCreateShardCommandList(relationId, shardIndex, shardId, + ddlCommandList, + foreignConstraintCommandList); + + ExecuteCriticalRemoteCommandList(connection, commandList); InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardSize, nodeGroupId); @@ -529,8 +534,10 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell); uint64 shardId = shardPlacement->shardId; ShardInterval *shardInterval = LoadShardInterval(shardId); + List *relationShardList = NIL; MultiConnection *connection = NULL; int shardIndex = -1; + List *commandList = NIL; if (colocatedShard) { @@ -544,18 +551,10 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, */ if (ShouldRecordRelationAccess() && useExclusiveConnection && partitionTable) { - RelationShard *parentRelationShard = CitusMakeNode(RelationShard); - RelationShard *partitionRelationShard = CitusMakeNode(RelationShard); - List *relationShardList = NIL; List *placementAccessList = NIL; - parentRelationShard->relationId = PartitionParentOid(distributedRelationId); - parentRelationShard->shardId = - ColocatedShardIdInRelation(parentRelationShard->relationId, shardIndex); - partitionRelationShard->relationId = distributedRelationId; - partitionRelationShard->shardId = shardId; + relationShardList = RelationShardListForShardCreate(shardInterval); - relationShardList = list_make2(parentRelationShard, partitionRelationShard); placementAccessList = BuildPlacementDDLList(shardPlacement->groupId, relationShardList); @@ -577,8 +576,12 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, RemoteTransactionBeginIfNecessary(connection); MarkRemoteTransactionCritical(connection); - WorkerCreateShard(distributedRelationId, shardIndex, shardId, - ddlCommandList, foreignConstraintCommandList, connection); + commandList = WorkerCreateShardCommandList(distributedRelationId, shardIndex, + shardId, + ddlCommandList, + foreignConstraintCommandList); + + ExecuteCriticalRemoteCommandList(connection, commandList); } /* @@ -595,14 +598,110 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, /* - * WorkerCreateShard applies DDL commands for the given shardId to create the - * shard on the worker node. Commands are sent to the worker node over the - * given connection. + * RelationShardListForShardCreate gets a shard interval and returns the placement + * accesses that would happen when a placement of the shard interval is created. */ -void -WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlCommandList, - List *foreignConstraintCommandList, MultiConnection *connection) +static List * +RelationShardListForShardCreate(ShardInterval *shardInterval) { + List *relationShardList = NIL; + RelationShard *relationShard = NULL; + Oid relationId = shardInterval->relationId; + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId); + List *referencedRelationList = cacheEntry->referencedRelationsViaForeignKey; + List *referencingRelationList = cacheEntry->referencingRelationsViaForeignKey; + List *allForeignKeyRelations = + list_concat_unique_oid(referencedRelationList, referencingRelationList); + int shardIndex = -1; + ListCell *fkeyRelationIdCell = NULL; + + /* record the placement access of the shard itself */ + relationShard = CitusMakeNode(RelationShard); + relationShard->relationId = relationId; + relationShard->shardId = shardInterval->shardId; + relationShardList = list_make1(relationShard); + + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH && + cacheEntry->colocationId != INVALID_COLOCATION_ID) + { + shardIndex = ShardIndex(shardInterval); + } + + + /* all foregin key constraint relations */ + foreach(fkeyRelationIdCell, allForeignKeyRelations) + { + Oid fkeyRelationid = lfirst_oid(fkeyRelationIdCell); + RelationShard *fkeyRelationShard = NULL; + uint64 fkeyShardId = INVALID_SHARD_ID; + + if (!IsDistributedTable(fkeyRelationid)) + { + /* we're not interested in local tables */ + continue; + } + + if (PartitionMethod(fkeyRelationid) == DISTRIBUTE_BY_NONE) + { + fkeyShardId = GetFirstShardId(fkeyRelationid); + } + else if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH && + PartitionMethod(fkeyRelationid) == DISTRIBUTE_BY_HASH) + { + /* hash distributed tables should be colocated to have fkey */ + Assert(TableColocationId(fkeyRelationid) == cacheEntry->colocationId); + + fkeyShardId = + ColocatedShardIdInRelation(fkeyRelationid, shardIndex); + } + else + { + /* + * We currently do not support foreign keys from/to local tables or + * non-colocated tables when creating shards. Also note that shard + * creation via shard moves doesn't happen in a transaction block, + * so not relevant here. + */ + continue; + } + + fkeyRelationShard = CitusMakeNode(RelationShard); + fkeyRelationShard->relationId = fkeyRelationid; + fkeyRelationShard->shardId = fkeyShardId; + + relationShardList = lappend(relationShardList, fkeyRelationShard); + } + + + /* if partitioned table, make sure to record the parent table */ + if (PartitionTable(relationId)) + { + RelationShard *parentRelationShard = CitusMakeNode(RelationShard); + + /* partitioned tables are always co-located */ + Assert(shardIndex != -1); + + parentRelationShard->relationId = PartitionParentOid(relationId); + parentRelationShard->shardId = + ColocatedShardIdInRelation(parentRelationShard->relationId, shardIndex); + + relationShardList = lappend(relationShardList, parentRelationShard); + } + + return relationShardList; +} + + +/* + * WorkerCreateShardCommandList returns a list of DDL commands for the given + * shardId to create the shard on the worker node. + */ +List * +WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId, + List *ddlCommandList, + List *foreignConstraintCommandList) +{ + List *commandList = NIL; Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); char *escapedSchemaName = quote_literal_cstr(schemaName); @@ -627,7 +726,7 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma escapedDDLCommand); } - ExecuteCriticalRemoteCommand(connection, applyDDLCommand->data); + commandList = lappend(commandList, applyDDLCommand->data); } foreach(foreignConstraintCommandCell, foreignConstraintCommandList) @@ -683,8 +782,7 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma WORKER_APPLY_INTER_SHARD_DDL_COMMAND, shardId, escapedSchemaName, referencedShardId, escapedReferencedSchemaName, escapedCommand); - - ExecuteCriticalRemoteCommand(connection, applyForeignConstraintCommand->data); + commandList = lappend(commandList, applyForeignConstraintCommand->data); } /* @@ -695,8 +793,11 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma { ShardInterval *shardInterval = LoadShardInterval(shardId); char *attachPartitionCommand = GenerateAttachShardPartitionCommand(shardInterval); - ExecuteCriticalRemoteCommand(connection, attachPartitionCommand); + + commandList = lappend(commandList, attachPartitionCommand); } + + return commandList; } diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 06c68e4ea..155aab574 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -125,9 +125,9 @@ extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shard extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool useExclusiveConnections); extern void CreateReferenceTableShard(Oid distributedTableId); -extern void WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, - List *ddlCommandList, List *foreignConstraintCommandList, - MultiConnection *connection); +extern List * WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId, + List *ddlCommandList, + List *foreignConstraintCommandList); extern Oid ForeignConstraintGetReferencedTableId(char *queryString); extern void CheckHashPartitionedTable(Oid distributedTableId); extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName, diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 5a55a25a1..0321feb2d 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -39,6 +39,8 @@ extern char * pchomp(const char *in); extern void LogRemoteCommand(MultiConnection *connection, const char *command); /* wrappers around libpq functions, with command logging support */ +extern void ExecuteCriticalRemoteCommandList(MultiConnection *connection, + List *commandList); extern void ExecuteCriticalRemoteCommand(MultiConnection *connection, const char *command); extern int ExecuteOptionalRemoteCommand(MultiConnection *connection,