Detect dist. VACUUM, take locks, error

Setting up the skeleton of the logic.
pull/1013/head
Jason Petersen 2016-12-02 11:19:52 -07:00
parent 257d9b49fd
commit 53b020cb1d
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
1 changed files with 57 additions and 0 deletions

View File

@ -109,6 +109,9 @@ static Node * ProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
const char *alterObjectSchemaCommand,
bool isTopLevel);
static Node * ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand,
bool isTopLevel);
static List * VacuumTaskList(Oid relationId, const char *commandString);
/* Local functions forward declarations for unsupported command checks */
static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement);
@ -281,6 +284,17 @@ multi_ProcessUtility(Node *parsetree,
errhint("Connect to worker nodes directly to manually "
"move all tables.")));
}
if (IsA(parsetree, VacuumStmt))
{
VacuumStmt *vacuumStmt = (VacuumStmt *) parsetree;
/* must check fields to know whether actually a vacuum */
if (vacuumStmt->options | VACOPT_VACUUM)
{
parsetree = ProcessVacuumStmt(vacuumStmt, queryString, isTopLevel);
}
}
}
/*
@ -882,6 +896,49 @@ ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
}
/* TODO: Write function comments */
static Node *
ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand, bool isTopLevel)
{
Oid relationId = InvalidOid;
List *taskList = NIL;
if (vacuumStmt->relation == NULL)
{
return (Node *) vacuumStmt;
}
relationId = RangeVarGetRelid(vacuumStmt->relation, NoLock, false);
/* first check whether a distributed relation is affected */
if (!OidIsValid(relationId) || !IsDistributedTable(relationId))
{
return (Node *) vacuumStmt;
}
taskList = VacuumTaskList(relationId, vacuumCommand);
return (Node *) vacuumStmt;
}
/* TODO: Write function comments */
static List *
VacuumTaskList(Oid relationId, const char *commandString)
{
List *taskList = NIL;
List *shardIntervalList = LoadShardIntervalList(relationId);
/* lock metadata before getting placement lists */
LockShardListMetadata(shardIntervalList, ExclusiveLock);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("VACUUM of distributed tables is not supported")));
return taskList;
}
/*
* ErrorIfUnsupportedIndexStmt checks if the corresponding index statement is
* supported for distributed tables and errors out if it is not.