Unify error handling, clean up ANALYZE flow

pull/1013/head
Jason Petersen 2016-12-13 15:58:55 -07:00
parent e80d7f5741
commit 6bb916cc4f
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
1 changed files with 69 additions and 13 deletions

View File

@ -111,6 +111,7 @@ static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSch
const char *alterObjectSchemaCommand, const char *alterObjectSchemaCommand,
bool isTopLevel); bool isTopLevel);
static void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand); static void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);
static bool IsSupportedDistributedVacuumStmt(Oid relationId, VacuumStmt *vacuumStmt);
static List * VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt); static List * VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt);
static StringInfo DeparseVacuumStmtPrefix(VacuumStmt *vacuumStmt); static StringInfo DeparseVacuumStmtPrefix(VacuumStmt *vacuumStmt);
@ -901,16 +902,15 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
{ {
Oid relationId = InvalidOid; Oid relationId = InvalidOid;
List *taskList = NIL; List *taskList = NIL;
bool supportedVacuumStmt = false;
if (vacuumStmt->relation == NULL) if (vacuumStmt->relation != NULL)
{ {
return; relationId = RangeVarGetRelid(vacuumStmt->relation, NoLock, false);
} }
relationId = RangeVarGetRelid(vacuumStmt->relation, NoLock, false); supportedVacuumStmt = IsSupportedDistributedVacuumStmt(relationId, vacuumStmt);
if (!supportedVacuumStmt)
/* first check whether a distributed relation is affected */
if (!OidIsValid(relationId) || !IsDistributedTable(relationId))
{ {
return; return;
} }
@ -923,6 +923,53 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
} }
static bool
IsSupportedDistributedVacuumStmt(Oid relationId, VacuumStmt *vacuumStmt)
{
const char *stmtName = (vacuumStmt->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
if (vacuumStmt->relation == NULL && EnableDDLPropagation)
{
/* WARN and exit early for local unqualified VACUUM commands */
ereport(WARNING, (errmsg("not propagating %s command to worker nodes", stmtName),
errhint("Provide a specific table in order to %s "
"distributed tables.", stmtName)));
return false;
}
if (!OidIsValid(relationId) || !IsDistributedTable(relationId))
{
return false;
}
if (!EnableDDLPropagation)
{
/* WARN and exit early if DDL propagation is not enabled */
ereport(WARNING, (errmsg("not propagating %s command to worker nodes", stmtName),
errhint("Set citus.enable_ddl_propagation to true in order to "
"send targeted %s commands to worker nodes.",
stmtName)));
}
if (vacuumStmt->options & VACOPT_VERBOSE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("the VERBOSE option is currently unsupported in "
"distributed %s commands", stmtName)));
}
if (vacuumStmt->va_cols != NIL)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("specifying a column list is currently unsupported in "
"distributed %s commands", stmtName)));
}
return true;
}
/* TODO: Write function comments */ /* TODO: Write function comments */
static List * static List *
VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt) VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt)
@ -975,7 +1022,8 @@ static StringInfo
DeparseVacuumStmtPrefix(VacuumStmt *vacuumStmt) DeparseVacuumStmtPrefix(VacuumStmt *vacuumStmt)
{ {
StringInfo vacuumPrefix = makeStringInfo(); StringInfo vacuumPrefix = makeStringInfo();
const int supportedFlags = ( int vacuumFlags = vacuumStmt->options;
const int unsupportedFlags = ~(
VACOPT_ANALYZE | VACOPT_ANALYZE |
#if (PG_VERSION_NUM >= 90600) #if (PG_VERSION_NUM >= 90600)
VACOPT_DISABLE_PAGE_SKIPPING | VACOPT_DISABLE_PAGE_SKIPPING |
@ -983,22 +1031,30 @@ DeparseVacuumStmtPrefix(VacuumStmt *vacuumStmt)
VACOPT_FREEZE | VACOPT_FREEZE |
VACOPT_FULL VACOPT_FULL
); );
const int vacuumFlags = vacuumStmt->options;
if (!(vacuumStmt->options & VACOPT_VACUUM)) /* determine actual command and block out its bit */
if (vacuumFlags & VACOPT_VACUUM)
{
appendStringInfoString(vacuumPrefix, "VACUUM ");
vacuumFlags &= ~VACOPT_VACUUM;
}
else
{ {
appendStringInfoString(vacuumPrefix, "ANALYZE "); appendStringInfoString(vacuumPrefix, "ANALYZE ");
vacuumFlags &= ~VACOPT_ANALYZE;
return vacuumPrefix;
} }
appendStringInfoString(vacuumPrefix, "VACUUM "); /* unsupported flags and options should have already been rejected */
Assert((vacuumFlags & unsupportedFlags) == 0);
Assert(vacuumStmt->va_cols == NIL);
if (!(vacuumFlags & supportedFlags)) /* if no flags remain, exit early */
if (vacuumFlags == 0)
{ {
return vacuumPrefix; return vacuumPrefix;
} }
/* otherwise, handle options */
appendStringInfoChar(vacuumPrefix, '('); appendStringInfoChar(vacuumPrefix, '(');
if (vacuumFlags & VACOPT_ANALYZE) if (vacuumFlags & VACOPT_ANALYZE)