Refactor code

Refactor code to handle errors before distributed table creation
pull/1288/head
velioglu 2017-03-20 17:02:17 +03:00
parent 821dead6ca
commit 4390bc09c3
1 changed files with 43 additions and 29 deletions

View File

@ -117,7 +117,7 @@ static bool IsSupportedDistributedVacuumStmt(Oid relationId, VacuumStmt *vacuumS
static List * VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt); static List * VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt);
static StringInfo DeparseVacuumStmtPrefix(VacuumStmt *vacuumStmt); static StringInfo DeparseVacuumStmtPrefix(VacuumStmt *vacuumStmt);
static char * DeparseVacuumColumnNames(List *columnNameList); static char * DeparseVacuumColumnNames(List *columnNameList);
static void ErrorIfUnsupportedAlterAddConstraintStmt(AlterTableStmt *alterTableStatement);
/* Local functions forward declarations for unsupported command checks */ /* Local functions forward declarations for unsupported command checks */
static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement); static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement);
@ -376,7 +376,7 @@ multi_ProcessUtility(Node *parsetree,
{ {
AlterTableStmt *alterTableStatement = (AlterTableStmt *) parsetree; AlterTableStmt *alterTableStatement = (AlterTableStmt *) parsetree;
ErrorIfUnsupportedAlterTableStmt(alterTableStatement); ErrorIfUnsupportedAlterAddConstraintStmt(alterTableStatement);
} }
if (commandMustRunAsOwner) if (commandMustRunAsOwner)
@ -825,6 +825,8 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo
return NULL; return NULL;
} }
ErrorIfUnsupportedAlterTableStmt(alterTableStatement);
/* /*
* We check if there is a ADD FOREIGN CONSTRAINT command in sub commands list. * We check if there is a ADD FOREIGN CONSTRAINT command in sub commands list.
* If there is we assign referenced relation id to rightRelationId and we also * If there is we assign referenced relation id to rightRelationId and we also
@ -1362,6 +1364,40 @@ ErrorIfUnsupportedDropIndexStmt(DropStmt *dropIndexStatement)
} }
static void
ErrorIfUnsupportedAlterAddConstraintStmt(AlterTableStmt *alterTableStatement)
{
Oid relationId = InvalidOid;
bool isDistributedRelation = false;
LOCKMODE lockmode = 0;
Relation relation = NULL;
char distributionMethod;
Var *distributionColumn = NULL;
uint32 colocationId = 0;
/* continue if it is not distributed table */
lockmode = AlterTableGetLockLevel(alterTableStatement->cmds);
relationId = AlterTableLookupRelation(alterTableStatement, lockmode);
isDistributedRelation = IsDistributedTable(relationId);
if (!isDistributedRelation)
{
return;
}
distributionMethod = PartitionMethod(relationId);
distributionColumn = PartitionKey(relationId);
colocationId = TableColocationId(relationId);
relation = relation_open(relationId, ExclusiveLock);
ErrorIfNotSupportedConstraint(relation, distributionMethod,
distributionColumn, colocationId);
relation_close(relation, NoLock);
}
/* /*
* ErrorIfUnsupportedAlterTableStmt checks if the corresponding alter table statement * ErrorIfUnsupportedAlterTableStmt checks if the corresponding alter table statement
* is supported for distributed tables and errors out if it is not. Currently, * is supported for distributed tables and errors out if it is not. Currently,
@ -1378,19 +1414,6 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
{ {
List *commandList = alterTableStatement->cmds; List *commandList = alterTableStatement->cmds;
ListCell *commandCell = NULL; ListCell *commandCell = NULL;
Oid relationId = InvalidOid;
bool isDistributedRelation = false;
LOCKMODE lockmode = 0;
/* error out if table is not distributed */
lockmode = AlterTableGetLockLevel(alterTableStatement->cmds);
relationId = AlterTableLookupRelation(alterTableStatement, lockmode);
isDistributedRelation = IsDistributedTable(relationId);
if (!isDistributedRelation)
{
return;
}
/* error out if any of the subcommands are unsupported */ /* error out if any of the subcommands are unsupported */
foreach(commandCell, commandList) foreach(commandCell, commandList)
@ -1443,6 +1466,11 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
Var *partitionColumn = NULL; Var *partitionColumn = NULL;
HeapTuple tuple = NULL; HeapTuple tuple = NULL;
char *alterColumnName = command->name; char *alterColumnName = command->name;
Oid relationId = InvalidOid;
LOCKMODE lockmode = 0;
lockmode = AlterTableGetLockLevel(alterTableStatement->cmds);
relationId = AlterTableLookupRelation(alterTableStatement, lockmode);
if (!OidIsValid(relationId)) if (!OidIsValid(relationId))
{ {
@ -1474,11 +1502,6 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
{ {
Constraint *constraint = (Constraint *) command->def; Constraint *constraint = (Constraint *) command->def;
Relation relation = NULL;
char distributionMethod;
Var *distributionColumn = NULL;
uint32 colocationId = 0;
/* we only allow constraints if they are only subcommand */ /* we only allow constraints if they are only subcommand */
if (commandList->length > 1) if (commandList->length > 1)
{ {
@ -1502,15 +1525,6 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
"supported."))); "supported.")));
} }
distributionMethod = PartitionMethod(relationId);
distributionColumn = PartitionKey(relationId);
colocationId = TableColocationId(relationId);
relation = relation_open(relationId, ExclusiveLock);
ErrorIfNotSupportedConstraint(relation, distributionMethod,
distributionColumn, colocationId);
relation_close(relation, NoLock);
break; break;
} }