Apply master_stage_protocol refactoring changes

pull/2242/head
Murat Tuncer 2018-06-28 11:24:57 +03:00
parent 86a3dd5a90
commit 3fc7cdfe6d
4 changed files with 60 additions and 45 deletions

View File

@ -381,7 +381,6 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
int placementsCreated = 0;
int attemptNumber = 0;
List *foreignConstraintCommandList = GetTableForeignConstraintCommands(relationId);
char *alterTableAttachPartitionCommand = NULL;
bool includeSequenceDefaults = false;
List *ddlCommandList = GetTableDDLEvents(relationId, includeSequenceDefaults);
uint32 connectionFlag = FOR_DDL;
@ -416,8 +415,7 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
}
WorkerCreateShard(relationId, shardIndex, shardId, ddlCommandList,
foreignConstraintCommandList, alterTableAttachPartitionCommand,
connection);
foreignConstraintCommandList, connection);
InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardSize,
nodeGroupId);
@ -499,7 +497,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
ListCell *connectionCell = NULL;
ListCell *shardPlacementCell = NULL;
int connectionFlags = FOR_DDL;
char *alterTableAttachPartitionCommand = NULL;
bool partitionTable = PartitionTable(distributedRelationId);
if (useExclusiveConnection)
{
@ -507,12 +505,6 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
}
if (PartitionTable(distributedRelationId))
{
alterTableAttachPartitionCommand =
GenerateAlterTableAttachPartitionCommand(distributedRelationId);
}
BeginOrContinueCoordinatedTransaction();
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC ||
@ -527,7 +519,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
RecordParallelDDLAccess(distributedRelationId);
/* we should mark the parent as well */
if (alterTableAttachPartitionCommand != NULL)
if (partitionTable)
{
Oid parentRelationId = PartitionParentOid(distributedRelationId);
RecordParallelDDLAccess(parentRelationId);
@ -552,8 +544,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
* with DDL. This is only important for parallel relation access in transaction
* blocks, thus check useExclusiveConnection and transaction block as well.
*/
if ((ShouldRecordRelationAccess() && useExclusiveConnection) &&
alterTableAttachPartitionCommand != NULL)
if (ShouldRecordRelationAccess() && useExclusiveConnection && partitionTable)
{
RelationShard *parentRelationShard = CitusMakeNode(RelationShard);
RelationShard *partitionRelationShard = CitusMakeNode(RelationShard);
@ -589,8 +580,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
MarkRemoteTransactionCritical(connection);
WorkerCreateShard(distributedRelationId, shardIndex, shardId,
ddlCommandList, foreignConstraintCommandList,
alterTableAttachPartitionCommand, connection);
ddlCommandList, foreignConstraintCommandList, connection);
}
/*
@ -613,8 +603,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
*/
void
WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlCommandList,
List *foreignConstraintCommandList,
char *alterTableAttachPartitionCommand, MultiConnection *connection)
List *foreignConstraintCommandList, MultiConnection *connection)
{
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
@ -697,34 +686,11 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma
* If the shard is created for a partition, send the command to create the
* partitioning hierarcy on the shard.
*/
if (alterTableAttachPartitionCommand != NULL)
if (PartitionTable(relationId))
{
Oid parentRelationId = PartitionParentOid(relationId);
uint64 correspondingParentShardId = InvalidOid;
StringInfo applyAttachPartitionCommand = makeStringInfo();
Oid parentSchemaId = InvalidOid;
char *parentSchemaName = NULL;
char *escapedParentSchemaName = NULL;
char *escapedCommand = NULL;
Assert(PartitionTable(relationId));
parentSchemaId = get_rel_namespace(parentRelationId);
parentSchemaName = get_namespace_name(parentSchemaId);
escapedParentSchemaName = quote_literal_cstr(parentSchemaName);
escapedCommand = quote_literal_cstr(alterTableAttachPartitionCommand);
correspondingParentShardId = ColocatedShardIdInRelation(parentRelationId,
shardIndex);
appendStringInfo(applyAttachPartitionCommand,
WORKER_APPLY_INTER_SHARD_DDL_COMMAND, correspondingParentShardId,
escapedParentSchemaName, shardId, escapedSchemaName,
escapedCommand);
ExecuteCriticalRemoteCommand(connection, applyAttachPartitionCommand->data);
ShardInterval *shardInterval = LoadShardInterval(shardId);
char *attachPartitionCommand = GenerateAttachShardPartitionCommand(shardInterval);
ExecuteCriticalRemoteCommand(connection, attachPartitionCommand);
}
}

View File

@ -19,7 +19,11 @@
#include "catalog/pg_constraint_fn.h"
#endif
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/shardinterval_utils.h"
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
#include "utils/builtins.h"
@ -342,6 +346,50 @@ GeneratePartitioningInformation(Oid parentTableId)
}
/*
* GenerateAttachShardPartitionCommand generates command to attach a child table
* table to its parent in a partitioning hierarchy.
*/
char *
GenerateAttachShardPartitionCommand(ShardInterval *shardInterval)
{
Oid schemaId = get_rel_namespace(shardInterval->relationId);
char *schemaName = get_namespace_name(schemaId);
char *escapedSchemaName = quote_literal_cstr(schemaName);
char *command = GenerateAlterTableAttachPartitionCommand(shardInterval->relationId);
char *escapedCommand = quote_literal_cstr(command);
int shardIndex = ShardIndex(shardInterval);
Oid parentSchemaId = InvalidOid;
char *parentSchemaName = NULL;
char *escapedParentSchemaName = NULL;
uint64 parentShardId = INVALID_SHARD_ID;
StringInfo attachPartitionCommand = makeStringInfo();
Oid parentRelationId = PartitionParentOid(shardInterval->relationId);
if (parentRelationId == InvalidOid)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot attach partition"),
errdetail("Referenced relation cannot be found.")));
}
parentSchemaId = get_rel_namespace(parentRelationId);
parentSchemaName = get_namespace_name(parentSchemaId);
escapedParentSchemaName = quote_literal_cstr(parentSchemaName);
parentShardId = ColocatedShardIdInRelation(parentRelationId, shardIndex);
appendStringInfo(attachPartitionCommand,
WORKER_APPLY_INTER_SHARD_DDL_COMMAND, parentShardId,
escapedParentSchemaName, shardInterval->shardId,
escapedSchemaName, escapedCommand);
return attachPartitionCommand->data;
}
/*
* GenerateAlterTableAttachPartitionCommand returns the necessary command to
* attach the given partition to its parent.

View File

@ -127,7 +127,6 @@ extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId,
extern void CreateReferenceTableShard(Oid distributedTableId);
extern void WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId,
List *ddlCommandList, List *foreignConstraintCommandList,
char *alterTableAttachPartitionCommand,
MultiConnection *connection);
extern Oid ForeignConstraintGetReferencedTableId(char *queryString);
extern void CheckHashPartitionedTable(Oid distributedTableId);

View File

@ -8,6 +8,7 @@
#define MULTI_PARTITIONING_UTILS_H_
#include "distributed/master_metadata_utility.h"
#include "nodes/pg_list.h"
@ -20,6 +21,7 @@ extern bool IsParentTable(Oid relationId);
extern Oid PartitionParentOid(Oid partitionOid);
extern List * PartitionList(Oid parentRelationId);
extern char * GenerateDetachPartitionCommand(Oid partitionTableId);
extern char * GenerateAttachShardPartitionCommand(ShardInterval *shardInterval);
extern char * GenerateAlterTableAttachPartitionCommand(Oid partitionTableId);
extern char * GeneratePartitioningInformation(Oid tableId);