From 3fc7cdfe6d12d857b19a77e7f207788b7a5b08a3 Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Thu, 28 Jun 2018 11:24:57 +0300 Subject: [PATCH] Apply master_stage_protocol refactoring changes --- .../master/master_stage_protocol.c | 54 ++++--------------- .../utils/multi_partitioning_utils.c | 48 +++++++++++++++++ src/include/distributed/master_protocol.h | 1 - .../distributed/multi_partitioning_utils.h | 2 + 4 files changed, 60 insertions(+), 45 deletions(-) diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 3fe2bc8b0..e904d76a5 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -381,7 +381,6 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId, int placementsCreated = 0; int attemptNumber = 0; List *foreignConstraintCommandList = GetTableForeignConstraintCommands(relationId); - char *alterTableAttachPartitionCommand = NULL; bool includeSequenceDefaults = false; List *ddlCommandList = GetTableDDLEvents(relationId, includeSequenceDefaults); uint32 connectionFlag = FOR_DDL; @@ -416,8 +415,7 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId, } WorkerCreateShard(relationId, shardIndex, shardId, ddlCommandList, - foreignConstraintCommandList, alterTableAttachPartitionCommand, - connection); + foreignConstraintCommandList, connection); InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardSize, nodeGroupId); @@ -499,7 +497,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, ListCell *connectionCell = NULL; ListCell *shardPlacementCell = NULL; int connectionFlags = FOR_DDL; - char *alterTableAttachPartitionCommand = NULL; + bool partitionTable = PartitionTable(distributedRelationId); if (useExclusiveConnection) { @@ -507,12 +505,6 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, } - if (PartitionTable(distributedRelationId)) - { - alterTableAttachPartitionCommand = - GenerateAlterTableAttachPartitionCommand(distributedRelationId); - } - BeginOrContinueCoordinatedTransaction(); if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC || @@ -527,7 +519,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, RecordParallelDDLAccess(distributedRelationId); /* we should mark the parent as well */ - if (alterTableAttachPartitionCommand != NULL) + if (partitionTable) { Oid parentRelationId = PartitionParentOid(distributedRelationId); RecordParallelDDLAccess(parentRelationId); @@ -552,8 +544,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, * with DDL. This is only important for parallel relation access in transaction * blocks, thus check useExclusiveConnection and transaction block as well. */ - if ((ShouldRecordRelationAccess() && useExclusiveConnection) && - alterTableAttachPartitionCommand != NULL) + if (ShouldRecordRelationAccess() && useExclusiveConnection && partitionTable) { RelationShard *parentRelationShard = CitusMakeNode(RelationShard); RelationShard *partitionRelationShard = CitusMakeNode(RelationShard); @@ -589,8 +580,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, MarkRemoteTransactionCritical(connection); WorkerCreateShard(distributedRelationId, shardIndex, shardId, - ddlCommandList, foreignConstraintCommandList, - alterTableAttachPartitionCommand, connection); + ddlCommandList, foreignConstraintCommandList, connection); } /* @@ -613,8 +603,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, */ void WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlCommandList, - List *foreignConstraintCommandList, - char *alterTableAttachPartitionCommand, MultiConnection *connection) + List *foreignConstraintCommandList, MultiConnection *connection) { Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); @@ -697,34 +686,11 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma * If the shard is created for a partition, send the command to create the * partitioning hierarcy on the shard. */ - if (alterTableAttachPartitionCommand != NULL) + if (PartitionTable(relationId)) { - Oid parentRelationId = PartitionParentOid(relationId); - uint64 correspondingParentShardId = InvalidOid; - StringInfo applyAttachPartitionCommand = makeStringInfo(); - - Oid parentSchemaId = InvalidOid; - char *parentSchemaName = NULL; - char *escapedParentSchemaName = NULL; - char *escapedCommand = NULL; - - Assert(PartitionTable(relationId)); - - parentSchemaId = get_rel_namespace(parentRelationId); - parentSchemaName = get_namespace_name(parentSchemaId); - escapedParentSchemaName = quote_literal_cstr(parentSchemaName); - escapedCommand = quote_literal_cstr(alterTableAttachPartitionCommand); - - correspondingParentShardId = ColocatedShardIdInRelation(parentRelationId, - shardIndex); - - appendStringInfo(applyAttachPartitionCommand, - WORKER_APPLY_INTER_SHARD_DDL_COMMAND, correspondingParentShardId, - escapedParentSchemaName, shardId, escapedSchemaName, - escapedCommand); - - - ExecuteCriticalRemoteCommand(connection, applyAttachPartitionCommand->data); + ShardInterval *shardInterval = LoadShardInterval(shardId); + char *attachPartitionCommand = GenerateAttachShardPartitionCommand(shardInterval); + ExecuteCriticalRemoteCommand(connection, attachPartitionCommand); } } diff --git a/src/backend/distributed/utils/multi_partitioning_utils.c b/src/backend/distributed/utils/multi_partitioning_utils.c index 4659f3a5f..28b276959 100644 --- a/src/backend/distributed/utils/multi_partitioning_utils.c +++ b/src/backend/distributed/utils/multi_partitioning_utils.c @@ -19,7 +19,11 @@ #include "catalog/pg_constraint_fn.h" #endif #include "distributed/citus_ruleutils.h" +#include "distributed/colocation_utils.h" +#include "distributed/master_metadata_utility.h" +#include "distributed/master_protocol.h" #include "distributed/multi_partitioning_utils.h" +#include "distributed/shardinterval_utils.h" #include "lib/stringinfo.h" #include "nodes/pg_list.h" #include "utils/builtins.h" @@ -342,6 +346,50 @@ GeneratePartitioningInformation(Oid parentTableId) } +/* + * GenerateAttachShardPartitionCommand generates command to attach a child table + * table to its parent in a partitioning hierarchy. + */ +char * +GenerateAttachShardPartitionCommand(ShardInterval *shardInterval) +{ + Oid schemaId = get_rel_namespace(shardInterval->relationId); + char *schemaName = get_namespace_name(schemaId); + char *escapedSchemaName = quote_literal_cstr(schemaName); + + char *command = GenerateAlterTableAttachPartitionCommand(shardInterval->relationId); + char *escapedCommand = quote_literal_cstr(command); + int shardIndex = ShardIndex(shardInterval); + + Oid parentSchemaId = InvalidOid; + char *parentSchemaName = NULL; + char *escapedParentSchemaName = NULL; + uint64 parentShardId = INVALID_SHARD_ID; + + StringInfo attachPartitionCommand = makeStringInfo(); + + Oid parentRelationId = PartitionParentOid(shardInterval->relationId); + if (parentRelationId == InvalidOid) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot attach partition"), + errdetail("Referenced relation cannot be found."))); + } + + parentSchemaId = get_rel_namespace(parentRelationId); + parentSchemaName = get_namespace_name(parentSchemaId); + escapedParentSchemaName = quote_literal_cstr(parentSchemaName); + parentShardId = ColocatedShardIdInRelation(parentRelationId, shardIndex); + + appendStringInfo(attachPartitionCommand, + WORKER_APPLY_INTER_SHARD_DDL_COMMAND, parentShardId, + escapedParentSchemaName, shardInterval->shardId, + escapedSchemaName, escapedCommand); + + return attachPartitionCommand->data; +} + + /* * GenerateAlterTableAttachPartitionCommand returns the necessary command to * attach the given partition to its parent. diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 3f0c7eae1..018073545 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -127,7 +127,6 @@ extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, extern void CreateReferenceTableShard(Oid distributedTableId); extern void WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlCommandList, List *foreignConstraintCommandList, - char *alterTableAttachPartitionCommand, MultiConnection *connection); extern Oid ForeignConstraintGetReferencedTableId(char *queryString); extern void CheckHashPartitionedTable(Oid distributedTableId); diff --git a/src/include/distributed/multi_partitioning_utils.h b/src/include/distributed/multi_partitioning_utils.h index d780aacf1..5c082ce51 100644 --- a/src/include/distributed/multi_partitioning_utils.h +++ b/src/include/distributed/multi_partitioning_utils.h @@ -8,6 +8,7 @@ #define MULTI_PARTITIONING_UTILS_H_ +#include "distributed/master_metadata_utility.h" #include "nodes/pg_list.h" @@ -20,6 +21,7 @@ extern bool IsParentTable(Oid relationId); extern Oid PartitionParentOid(Oid partitionOid); extern List * PartitionList(Oid parentRelationId); extern char * GenerateDetachPartitionCommand(Oid partitionTableId); +extern char * GenerateAttachShardPartitionCommand(ShardInterval *shardInterval); extern char * GenerateAlterTableAttachPartitionCommand(Oid partitionTableId); extern char * GeneratePartitioningInformation(Oid tableId);