mirror of https://github.com/citusdata/citus.git
parent
42c799faee
commit
23f5e4282d
|
@ -101,16 +101,16 @@ static void VerifyTransmitStmt(CopyStmt *copyStatement);
|
|||
/* Local functions forward declarations for processing distributed table commands */
|
||||
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
|
||||
bool *commandMustRunAsOwner);
|
||||
static DDLJob * PlanIndexStmt(IndexStmt *createIndexStatement,
|
||||
const char *createIndexCommand);
|
||||
static DDLJob * PlanDropIndexStmt(DropStmt *dropIndexStatement,
|
||||
const char *dropIndexCommand);
|
||||
static DDLJob * PlanAlterTableStmt(AlterTableStmt *alterTableStatement,
|
||||
const char *alterTableCommand);
|
||||
static List * PlanIndexStmt(IndexStmt *createIndexStatement,
|
||||
const char *createIndexCommand);
|
||||
static List * PlanDropIndexStmt(DropStmt *dropIndexStatement,
|
||||
const char *dropIndexCommand);
|
||||
static List * PlanAlterTableStmt(AlterTableStmt *alterTableStatement,
|
||||
const char *alterTableCommand);
|
||||
static Node * WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
|
||||
const char *alterTableCommand);
|
||||
static DDLJob * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
||||
const char *alterObjectSchemaCommand);
|
||||
static List * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
||||
const char *alterObjectSchemaCommand);
|
||||
static void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);
|
||||
static bool IsSupportedDistributedVacuumStmt(Oid relationId, VacuumStmt *vacuumStmt);
|
||||
static List * VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt);
|
||||
|
@ -168,7 +168,7 @@ multi_ProcessUtility(Node *parsetree,
|
|||
bool commandMustRunAsOwner = false;
|
||||
Oid savedUserId = InvalidOid;
|
||||
int savedSecurityContext = 0;
|
||||
DDLJob *ddlJob = NULL;
|
||||
List *ddlJobs = NIL;
|
||||
|
||||
if (IsA(parsetree, TransactionStmt))
|
||||
{
|
||||
|
@ -255,7 +255,7 @@ multi_ProcessUtility(Node *parsetree,
|
|||
{
|
||||
if (IsA(parsetree, IndexStmt))
|
||||
{
|
||||
ddlJob = PlanIndexStmt((IndexStmt *) parsetree, queryString);
|
||||
ddlJobs = PlanIndexStmt((IndexStmt *) parsetree, queryString);
|
||||
}
|
||||
|
||||
if (IsA(parsetree, DropStmt))
|
||||
|
@ -263,7 +263,7 @@ multi_ProcessUtility(Node *parsetree,
|
|||
DropStmt *dropStatement = (DropStmt *) parsetree;
|
||||
if (dropStatement->removeType == OBJECT_INDEX)
|
||||
{
|
||||
ddlJob = PlanDropIndexStmt(dropStatement, queryString);
|
||||
ddlJobs = PlanDropIndexStmt(dropStatement, queryString);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -272,7 +272,7 @@ multi_ProcessUtility(Node *parsetree,
|
|||
AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree;
|
||||
if (alterTableStmt->relkind == OBJECT_TABLE)
|
||||
{
|
||||
ddlJob = PlanAlterTableStmt(alterTableStmt, queryString);
|
||||
ddlJobs = PlanAlterTableStmt(alterTableStmt, queryString);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -296,7 +296,7 @@ multi_ProcessUtility(Node *parsetree,
|
|||
if (IsA(parsetree, AlterObjectSchemaStmt))
|
||||
{
|
||||
AlterObjectSchemaStmt *setSchemaStmt = (AlterObjectSchemaStmt *) parsetree;
|
||||
ddlJob = PlanAlterObjectSchemaStmt(setSchemaStmt, queryString);
|
||||
ddlJobs = PlanAlterObjectSchemaStmt(setSchemaStmt, queryString);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -375,9 +375,15 @@ multi_ProcessUtility(Node *parsetree,
|
|||
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
||||
}
|
||||
|
||||
if (ddlJob != NULL)
|
||||
if (ddlJobs != NIL)
|
||||
{
|
||||
ExecuteDistributedDDLJob(ddlJob);
|
||||
ListCell *ddlJobCell = NULL;
|
||||
|
||||
foreach(ddlJobCell, ddlJobs)
|
||||
{
|
||||
DDLJob *ddlJob = (DDLJob *) lfirst(ddlJobCell);
|
||||
ExecuteDistributedDDLJob(ddlJob);
|
||||
}
|
||||
}
|
||||
|
||||
/* we run VacuumStmt after standard hook to benefit from its checks and locking */
|
||||
|
@ -611,10 +617,10 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR
|
|||
* The function returns the IndexStmt node for the command to be executed on the
|
||||
* master node table.
|
||||
*/
|
||||
static DDLJob *
|
||||
static List *
|
||||
PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand)
|
||||
{
|
||||
DDLJob *ddlJob = NULL;
|
||||
List *ddlJobs = NIL;
|
||||
|
||||
/*
|
||||
* We first check whether a distributed relation is affected. For that, we need to
|
||||
|
@ -670,10 +676,12 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand)
|
|||
/* if index does not exist, send the command to workers */
|
||||
if (!OidIsValid(indexRelationId))
|
||||
{
|
||||
ddlJob = palloc0(sizeof(DDLJob));
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
ddlJob->targetRelationId = relationId;
|
||||
ddlJob->commandString = createIndexCommand;
|
||||
ddlJob->taskList = DDLTaskList(relationId, createIndexCommand);
|
||||
|
||||
ddlJobs = list_make1(ddlJob);
|
||||
}
|
||||
else if (!createIndexStatement->if_not_exists)
|
||||
{
|
||||
|
@ -684,7 +692,7 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand)
|
|||
}
|
||||
}
|
||||
|
||||
return ddlJob;
|
||||
return ddlJobs;
|
||||
}
|
||||
|
||||
|
||||
|
@ -696,10 +704,10 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand)
|
|||
* The function returns the DropStmt node for the command to be executed on the
|
||||
* master node table.
|
||||
*/
|
||||
static DDLJob *
|
||||
static List *
|
||||
PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand)
|
||||
{
|
||||
DDLJob *ddlJob = NULL;
|
||||
List *ddlJobs = NIL;
|
||||
ListCell *dropObjectCell = NULL;
|
||||
Oid distributedIndexId = InvalidOid;
|
||||
Oid distributedRelationId = InvalidOid;
|
||||
|
@ -764,7 +772,7 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand)
|
|||
|
||||
if (OidIsValid(distributedIndexId))
|
||||
{
|
||||
ddlJob = palloc0(sizeof(DDLJob));
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
|
||||
ErrorIfUnsupportedDropIndexStmt(dropIndexStatement);
|
||||
|
||||
|
@ -772,9 +780,11 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand)
|
|||
ddlJob->targetRelationId = distributedRelationId;
|
||||
ddlJob->commandString = dropIndexCommand;
|
||||
ddlJob->taskList = DDLTaskList(distributedRelationId, dropIndexCommand);
|
||||
|
||||
ddlJobs = list_make1(ddlJob);
|
||||
}
|
||||
|
||||
return ddlJob;
|
||||
return ddlJobs;
|
||||
}
|
||||
|
||||
|
||||
|
@ -786,9 +796,10 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand)
|
|||
* The function returns the AlterTableStmt node for the command to be executed on the
|
||||
* master node table.
|
||||
*/
|
||||
static DDLJob *
|
||||
static List *
|
||||
PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand)
|
||||
{
|
||||
List *ddlJobs = NIL;
|
||||
DDLJob *ddlJob = NULL;
|
||||
LOCKMODE lockmode = 0;
|
||||
Oid leftRelationId = InvalidOid;
|
||||
|
@ -800,20 +811,20 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo
|
|||
/* first check whether a distributed relation is affected */
|
||||
if (alterTableStatement->relation == NULL)
|
||||
{
|
||||
return NULL;
|
||||
return NIL;
|
||||
}
|
||||
|
||||
lockmode = AlterTableGetLockLevel(alterTableStatement->cmds);
|
||||
leftRelationId = AlterTableLookupRelation(alterTableStatement, lockmode);
|
||||
if (!OidIsValid(leftRelationId))
|
||||
{
|
||||
return NULL;
|
||||
return NIL;
|
||||
}
|
||||
|
||||
isDistributedRelation = IsDistributedTable(leftRelationId);
|
||||
if (!isDistributedRelation)
|
||||
{
|
||||
return NULL;
|
||||
return NIL;
|
||||
}
|
||||
|
||||
ErrorIfUnsupportedAlterTableStmt(alterTableStatement);
|
||||
|
@ -872,7 +883,9 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo
|
|||
ddlJob->taskList = DDLTaskList(leftRelationId, alterTableCommand);
|
||||
}
|
||||
|
||||
return ddlJob;
|
||||
ddlJobs = list_make1(ddlJob);
|
||||
|
||||
return ddlJobs;
|
||||
}
|
||||
|
||||
|
||||
|
@ -944,7 +957,7 @@ WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
|
|||
* or not. If it does, then it checks whether given object is a table. If it is, we warn
|
||||
* out, since we do not support ALTER ... SET SCHEMA
|
||||
*/
|
||||
static DDLJob *
|
||||
static List *
|
||||
PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
||||
const char *alterObjectSchemaCommand)
|
||||
{
|
||||
|
@ -953,7 +966,7 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
|||
|
||||
if (alterObjectSchemaStmt->relation == NULL)
|
||||
{
|
||||
return NULL;
|
||||
return NIL;
|
||||
}
|
||||
|
||||
relationId = RangeVarGetRelidExtended(alterObjectSchemaStmt->relation,
|
||||
|
@ -964,7 +977,7 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
|||
/* first check whether a distributed relation is affected */
|
||||
if (!OidIsValid(relationId) || !IsDistributedTable(relationId))
|
||||
{
|
||||
return NULL;
|
||||
return NIL;
|
||||
}
|
||||
|
||||
/* warn out if a distributed relation is affected */
|
||||
|
@ -973,7 +986,7 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
|||
errhint("Connect to worker nodes directly to manually "
|
||||
"change schemas of affected objects.")));
|
||||
|
||||
return NULL;
|
||||
return NIL;
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue