mirror of https://github.com/citusdata/citus.git
Remove execution from stmt-specific util functions
Now have a single Execute call in the main body.pull/1278/head
parent
a64165767d
commit
419a4c3745
|
@ -101,17 +101,16 @@ static void VerifyTransmitStmt(CopyStmt *copyStatement);
|
||||||
/* Local functions forward declarations for processing distributed table commands */
|
/* Local functions forward declarations for processing distributed table commands */
|
||||||
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
|
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
|
||||||
bool *commandMustRunAsOwner);
|
bool *commandMustRunAsOwner);
|
||||||
static Node * PlanIndexStmt(IndexStmt *createIndexStatement,
|
static DDLJob * PlanIndexStmt(IndexStmt *createIndexStatement,
|
||||||
const char *createIndexCommand, bool isTopLevel);
|
const char *createIndexCommand);
|
||||||
static Node * PlanDropIndexStmt(DropStmt *dropIndexStatement,
|
static DDLJob * PlanDropIndexStmt(DropStmt *dropIndexStatement,
|
||||||
const char *dropIndexCommand, bool isTopLevel);
|
const char *dropIndexCommand);
|
||||||
static Node * PlanAlterTableStmt(AlterTableStmt *alterTableStatement,
|
static DDLJob * PlanAlterTableStmt(AlterTableStmt *alterTableStatement,
|
||||||
const char *alterTableCommand, bool isTopLevel);
|
const char *alterTableCommand);
|
||||||
static Node * WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
|
static Node * WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
|
||||||
const char *alterTableCommand);
|
const char *alterTableCommand);
|
||||||
static Node * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
static DDLJob * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
||||||
const char *alterObjectSchemaCommand,
|
const char *alterObjectSchemaCommand);
|
||||||
bool isTopLevel);
|
|
||||||
static void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);
|
static void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);
|
||||||
static bool IsSupportedDistributedVacuumStmt(Oid relationId, VacuumStmt *vacuumStmt);
|
static bool IsSupportedDistributedVacuumStmt(Oid relationId, VacuumStmt *vacuumStmt);
|
||||||
static List * VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt);
|
static List * VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt);
|
||||||
|
@ -253,12 +252,11 @@ multi_ProcessUtility(Node *parsetree,
|
||||||
*/
|
*/
|
||||||
if (EnableDDLPropagation)
|
if (EnableDDLPropagation)
|
||||||
{
|
{
|
||||||
bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL);
|
DDLJob *ddlJob = NULL;
|
||||||
|
|
||||||
if (IsA(parsetree, IndexStmt))
|
if (IsA(parsetree, IndexStmt))
|
||||||
{
|
{
|
||||||
parsetree = PlanIndexStmt((IndexStmt *) parsetree, queryString,
|
ddlJob = PlanIndexStmt((IndexStmt *) parsetree, queryString);
|
||||||
isTopLevel);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IsA(parsetree, DropStmt))
|
if (IsA(parsetree, DropStmt))
|
||||||
|
@ -266,7 +264,7 @@ multi_ProcessUtility(Node *parsetree,
|
||||||
DropStmt *dropStatement = (DropStmt *) parsetree;
|
DropStmt *dropStatement = (DropStmt *) parsetree;
|
||||||
if (dropStatement->removeType == OBJECT_INDEX)
|
if (dropStatement->removeType == OBJECT_INDEX)
|
||||||
{
|
{
|
||||||
parsetree = PlanDropIndexStmt(dropStatement, queryString, isTopLevel);
|
ddlJob = PlanDropIndexStmt(dropStatement, queryString);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -275,8 +273,7 @@ multi_ProcessUtility(Node *parsetree,
|
||||||
AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree;
|
AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree;
|
||||||
if (alterTableStmt->relkind == OBJECT_TABLE)
|
if (alterTableStmt->relkind == OBJECT_TABLE)
|
||||||
{
|
{
|
||||||
parsetree = PlanAlterTableStmt(alterTableStmt, queryString,
|
ddlJob = PlanAlterTableStmt(alterTableStmt, queryString);
|
||||||
isTopLevel);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -300,8 +297,7 @@ multi_ProcessUtility(Node *parsetree,
|
||||||
if (IsA(parsetree, AlterObjectSchemaStmt))
|
if (IsA(parsetree, AlterObjectSchemaStmt))
|
||||||
{
|
{
|
||||||
AlterObjectSchemaStmt *setSchemaStmt = (AlterObjectSchemaStmt *) parsetree;
|
AlterObjectSchemaStmt *setSchemaStmt = (AlterObjectSchemaStmt *) parsetree;
|
||||||
parsetree = PlanAlterObjectSchemaStmt(setSchemaStmt, queryString,
|
ddlJob = PlanAlterObjectSchemaStmt(setSchemaStmt, queryString);
|
||||||
isTopLevel);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -316,6 +312,11 @@ multi_ProcessUtility(Node *parsetree,
|
||||||
errhint("Connect to worker nodes directly to manually "
|
errhint("Connect to worker nodes directly to manually "
|
||||||
"move all tables.")));
|
"move all tables.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (ddlJob != NULL)
|
||||||
|
{
|
||||||
|
ExecuteDistributedDDLJob(ddlJob);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -611,10 +612,11 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR
|
||||||
* The function returns the IndexStmt node for the command to be executed on the
|
* The function returns the IndexStmt node for the command to be executed on the
|
||||||
* master node table.
|
* master node table.
|
||||||
*/
|
*/
|
||||||
static Node *
|
static DDLJob *
|
||||||
PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand,
|
PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand)
|
||||||
bool isTopLevel)
|
|
||||||
{
|
{
|
||||||
|
DDLJob *ddlJob = NULL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We first check whether a distributed relation is affected. For that, we need to
|
* We first check whether a distributed relation is affected. For that, we need to
|
||||||
* open the relation. To prevent race conditions with later lookups, lock the table,
|
* open the relation. To prevent race conditions with later lookups, lock the table,
|
||||||
|
@ -669,12 +671,10 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand,
|
||||||
/* if index does not exist, send the command to workers */
|
/* if index does not exist, send the command to workers */
|
||||||
if (!OidIsValid(indexRelationId))
|
if (!OidIsValid(indexRelationId))
|
||||||
{
|
{
|
||||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
ddlJob = palloc0(sizeof(DDLJob));
|
||||||
ddlJob->targetRelationId = relationId;
|
ddlJob->targetRelationId = relationId;
|
||||||
ddlJob->commandString = createIndexCommand;
|
ddlJob->commandString = createIndexCommand;
|
||||||
ddlJob->taskList = DDLTaskList(relationId, createIndexCommand);
|
ddlJob->taskList = DDLTaskList(relationId, createIndexCommand);
|
||||||
|
|
||||||
ExecuteDistributedDDLJob(ddlJob);
|
|
||||||
}
|
}
|
||||||
else if (!createIndexStatement->if_not_exists)
|
else if (!createIndexStatement->if_not_exists)
|
||||||
{
|
{
|
||||||
|
@ -685,7 +685,7 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return (Node *) createIndexStatement;
|
return ddlJob;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -697,10 +697,10 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand,
|
||||||
* The function returns the DropStmt node for the command to be executed on the
|
* The function returns the DropStmt node for the command to be executed on the
|
||||||
* master node table.
|
* master node table.
|
||||||
*/
|
*/
|
||||||
static Node *
|
static DDLJob *
|
||||||
PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand,
|
PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand)
|
||||||
bool isTopLevel)
|
|
||||||
{
|
{
|
||||||
|
DDLJob *ddlJob = NULL;
|
||||||
ListCell *dropObjectCell = NULL;
|
ListCell *dropObjectCell = NULL;
|
||||||
Oid distributedIndexId = InvalidOid;
|
Oid distributedIndexId = InvalidOid;
|
||||||
Oid distributedRelationId = InvalidOid;
|
Oid distributedRelationId = InvalidOid;
|
||||||
|
@ -765,7 +765,7 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand,
|
||||||
|
|
||||||
if (OidIsValid(distributedIndexId))
|
if (OidIsValid(distributedIndexId))
|
||||||
{
|
{
|
||||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
ddlJob = palloc0(sizeof(DDLJob));
|
||||||
|
|
||||||
ErrorIfUnsupportedDropIndexStmt(dropIndexStatement);
|
ErrorIfUnsupportedDropIndexStmt(dropIndexStatement);
|
||||||
|
|
||||||
|
@ -773,11 +773,9 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand,
|
||||||
ddlJob->targetRelationId = distributedRelationId;
|
ddlJob->targetRelationId = distributedRelationId;
|
||||||
ddlJob->commandString = dropIndexCommand;
|
ddlJob->commandString = dropIndexCommand;
|
||||||
ddlJob->taskList = DDLTaskList(distributedRelationId, dropIndexCommand);
|
ddlJob->taskList = DDLTaskList(distributedRelationId, dropIndexCommand);
|
||||||
|
|
||||||
ExecuteDistributedDDLJob(ddlJob);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return (Node *) dropIndexStatement;
|
return ddlJob;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -789,10 +787,10 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand,
|
||||||
* The function returns the AlterTableStmt node for the command to be executed on the
|
* The function returns the AlterTableStmt node for the command to be executed on the
|
||||||
* master node table.
|
* master node table.
|
||||||
*/
|
*/
|
||||||
static Node *
|
static DDLJob *
|
||||||
PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand,
|
PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand)
|
||||||
bool isTopLevel)
|
|
||||||
{
|
{
|
||||||
|
DDLJob *ddlJob = NULL;
|
||||||
LOCKMODE lockmode = 0;
|
LOCKMODE lockmode = 0;
|
||||||
Oid leftRelationId = InvalidOid;
|
Oid leftRelationId = InvalidOid;
|
||||||
Oid rightRelationId = InvalidOid;
|
Oid rightRelationId = InvalidOid;
|
||||||
|
@ -803,20 +801,20 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo
|
||||||
/* first check whether a distributed relation is affected */
|
/* first check whether a distributed relation is affected */
|
||||||
if (alterTableStatement->relation == NULL)
|
if (alterTableStatement->relation == NULL)
|
||||||
{
|
{
|
||||||
return (Node *) alterTableStatement;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
lockmode = AlterTableGetLockLevel(alterTableStatement->cmds);
|
lockmode = AlterTableGetLockLevel(alterTableStatement->cmds);
|
||||||
leftRelationId = AlterTableLookupRelation(alterTableStatement, lockmode);
|
leftRelationId = AlterTableLookupRelation(alterTableStatement, lockmode);
|
||||||
if (!OidIsValid(leftRelationId))
|
if (!OidIsValid(leftRelationId))
|
||||||
{
|
{
|
||||||
return (Node *) alterTableStatement;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
isDistributedRelation = IsDistributedTable(leftRelationId);
|
isDistributedRelation = IsDistributedTable(leftRelationId);
|
||||||
if (!isDistributedRelation)
|
if (!isDistributedRelation)
|
||||||
{
|
{
|
||||||
return (Node *) alterTableStatement;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
ErrorIfUnsupportedAlterTableStmt(alterTableStatement);
|
ErrorIfUnsupportedAlterTableStmt(alterTableStatement);
|
||||||
|
@ -861,27 +859,21 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ddlJob = palloc0(sizeof(DDLJob));
|
||||||
|
ddlJob->targetRelationId = leftRelationId;
|
||||||
|
ddlJob->commandString = alterTableCommand;
|
||||||
|
|
||||||
if (rightRelationId)
|
if (rightRelationId)
|
||||||
{
|
{
|
||||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
|
||||||
ddlJob->targetRelationId = leftRelationId;
|
|
||||||
ddlJob->commandString = alterTableCommand;
|
|
||||||
ddlJob->taskList = ForeignKeyTaskList(leftRelationId, rightRelationId,
|
ddlJob->taskList = ForeignKeyTaskList(leftRelationId, rightRelationId,
|
||||||
alterTableCommand);
|
alterTableCommand);
|
||||||
|
|
||||||
ExecuteDistributedDDLJob(ddlJob);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
|
||||||
ddlJob->targetRelationId = leftRelationId;
|
|
||||||
ddlJob->commandString = alterTableCommand;
|
|
||||||
ddlJob->taskList = DDLTaskList(leftRelationId, alterTableCommand);
|
ddlJob->taskList = DDLTaskList(leftRelationId, alterTableCommand);
|
||||||
|
|
||||||
ExecuteDistributedDDLJob(ddlJob);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return (Node *) alterTableStatement;
|
return ddlJob;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -953,16 +945,16 @@ WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
|
||||||
* or not. If it does, then it checks whether given object is a table. If it is, we warn
|
* or not. If it does, then it checks whether given object is a table. If it is, we warn
|
||||||
* out, since we do not support ALTER ... SET SCHEMA
|
* out, since we do not support ALTER ... SET SCHEMA
|
||||||
*/
|
*/
|
||||||
static Node *
|
static DDLJob *
|
||||||
PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
||||||
const char *alterObjectSchemaCommand, bool isTopLevel)
|
const char *alterObjectSchemaCommand)
|
||||||
{
|
{
|
||||||
Oid relationId = InvalidOid;
|
Oid relationId = InvalidOid;
|
||||||
bool noWait = false;
|
bool noWait = false;
|
||||||
|
|
||||||
if (alterObjectSchemaStmt->relation == NULL)
|
if (alterObjectSchemaStmt->relation == NULL)
|
||||||
{
|
{
|
||||||
return (Node *) alterObjectSchemaStmt;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
relationId = RangeVarGetRelidExtended(alterObjectSchemaStmt->relation,
|
relationId = RangeVarGetRelidExtended(alterObjectSchemaStmt->relation,
|
||||||
|
@ -973,7 +965,7 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
||||||
/* first check whether a distributed relation is affected */
|
/* first check whether a distributed relation is affected */
|
||||||
if (!OidIsValid(relationId) || !IsDistributedTable(relationId))
|
if (!OidIsValid(relationId) || !IsDistributedTable(relationId))
|
||||||
{
|
{
|
||||||
return (Node *) alterObjectSchemaStmt;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* warn out if a distributed relation is affected */
|
/* warn out if a distributed relation is affected */
|
||||||
|
@ -982,7 +974,7 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
||||||
errhint("Connect to worker nodes directly to manually "
|
errhint("Connect to worker nodes directly to manually "
|
||||||
"change schemas of affected objects.")));
|
"change schemas of affected objects.")));
|
||||||
|
|
||||||
return (Node *) alterObjectSchemaStmt;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue