mirror of https://github.com/citusdata/citus.git
Generate VACUUM tasks and build task list
The StringInfo twiddling might be a little unorthodox, but it seemed weird to regenerate the beginning of each VACUUM statement.pull/1013/head
parent
53b020cb1d
commit
3189c2256d
|
@ -111,7 +111,9 @@ static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSch
|
||||||
bool isTopLevel);
|
bool isTopLevel);
|
||||||
static Node * ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand,
|
static Node * ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand,
|
||||||
bool isTopLevel);
|
bool isTopLevel);
|
||||||
static List * VacuumTaskList(Oid relationId, const char *commandString);
|
static List * VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt);
|
||||||
|
static StringInfo DeparseVacuumStmtPrefix(VacuumStmt *vacuumStmt);
|
||||||
|
|
||||||
|
|
||||||
/* Local functions forward declarations for unsupported command checks */
|
/* Local functions forward declarations for unsupported command checks */
|
||||||
static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement);
|
static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement);
|
||||||
|
@ -916,7 +918,7 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand, bool isTopL
|
||||||
return (Node *) vacuumStmt;
|
return (Node *) vacuumStmt;
|
||||||
}
|
}
|
||||||
|
|
||||||
taskList = VacuumTaskList(relationId, vacuumCommand);
|
taskList = VacuumTaskList(relationId, vacuumStmt);
|
||||||
|
|
||||||
return (Node *) vacuumStmt;
|
return (Node *) vacuumStmt;
|
||||||
}
|
}
|
||||||
|
@ -924,14 +926,47 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand, bool isTopL
|
||||||
|
|
||||||
/* TODO: Write function comments */
|
/* TODO: Write function comments */
|
||||||
static List *
|
static List *
|
||||||
VacuumTaskList(Oid relationId, const char *commandString)
|
VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt)
|
||||||
{
|
{
|
||||||
List *taskList = NIL;
|
List *taskList = NIL;
|
||||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||||
|
ListCell *shardIntervalCell = NULL;
|
||||||
|
uint64 jobId = INVALID_JOB_ID;
|
||||||
|
int taskId = 1;
|
||||||
|
StringInfo vacuumString = DeparseVacuumStmtPrefix(vacuumStmt);
|
||||||
|
const int vacuumPrefixLen = vacuumString->len;
|
||||||
|
Oid schemaId = get_rel_namespace(relationId);
|
||||||
|
char *schemaName = get_namespace_name(schemaId);
|
||||||
|
char *tableName = get_rel_name(relationId);
|
||||||
|
|
||||||
/* lock metadata before getting placement lists */
|
/* lock metadata before getting placement lists */
|
||||||
LockShardListMetadata(shardIntervalList, ExclusiveLock);
|
LockShardListMetadata(shardIntervalList, ExclusiveLock);
|
||||||
|
|
||||||
|
foreach(shardIntervalCell, shardIntervalList)
|
||||||
|
{
|
||||||
|
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
||||||
|
uint64 shardId = shardInterval->shardId;
|
||||||
|
Task *task = NULL;
|
||||||
|
|
||||||
|
char *shardName = pstrdup(tableName);
|
||||||
|
AppendShardIdToName(&shardName, shardInterval->shardId);
|
||||||
|
shardName = quote_qualified_identifier(schemaName, shardName);
|
||||||
|
|
||||||
|
vacuumString->len = vacuumPrefixLen;
|
||||||
|
appendStringInfoString(vacuumString, shardName);
|
||||||
|
|
||||||
|
task = CitusMakeNode(Task);
|
||||||
|
task->jobId = jobId;
|
||||||
|
task->taskId = taskId++;
|
||||||
|
task->taskType = SQL_TASK;
|
||||||
|
task->queryString = pstrdup(vacuumString->data);
|
||||||
|
task->dependedTaskList = NULL;
|
||||||
|
task->anchorShardId = shardId;
|
||||||
|
task->taskPlacementList = FinalizedShardPlacementList(shardId);
|
||||||
|
|
||||||
|
taskList = lappend(taskList, task);
|
||||||
|
}
|
||||||
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("VACUUM of distributed tables is not supported")));
|
errmsg("VACUUM of distributed tables is not supported")));
|
||||||
|
|
||||||
|
@ -939,6 +974,54 @@ VacuumTaskList(Oid relationId, const char *commandString)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* TODO: Write function comments */
|
||||||
|
static StringInfo
|
||||||
|
DeparseVacuumStmtPrefix(VacuumStmt *vacuumStmt)
|
||||||
|
{
|
||||||
|
StringInfo vacuumPrefix = makeStringInfo();
|
||||||
|
const int supportedFlags = (
|
||||||
|
#if (PG_VERSION_NUM >= 90600)
|
||||||
|
VACOPT_DISABLE_PAGE_SKIPPING |
|
||||||
|
#endif
|
||||||
|
VACOPT_FREEZE |
|
||||||
|
VACOPT_FULL
|
||||||
|
);
|
||||||
|
const int vacuumFlags = vacuumStmt->options;
|
||||||
|
|
||||||
|
appendStringInfoString(vacuumPrefix, "VACUUM ");
|
||||||
|
|
||||||
|
if (!(vacuumFlags & supportedFlags))
|
||||||
|
{
|
||||||
|
return vacuumPrefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
appendStringInfoChar(vacuumPrefix, '(');
|
||||||
|
|
||||||
|
if (vacuumFlags & VACOPT_FREEZE)
|
||||||
|
{
|
||||||
|
appendStringInfoString(vacuumPrefix, "FREEZE,");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (vacuumFlags & VACOPT_FULL)
|
||||||
|
{
|
||||||
|
appendStringInfoString(vacuumPrefix, "FULL,");
|
||||||
|
}
|
||||||
|
|
||||||
|
#if (PG_VERSION_NUM >= 90600)
|
||||||
|
if (vacuumFlags & VACOPT_DISABLE_PAGE_SKIPPING)
|
||||||
|
{
|
||||||
|
appendStringInfoString(vacuumPrefix, "DISABLE_PAGE_SKIPPING,");
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
vacuumPrefix->data[vacuumPrefix->len - 1] = ')';
|
||||||
|
|
||||||
|
appendStringInfoChar(vacuumPrefix, ' ');
|
||||||
|
|
||||||
|
return vacuumPrefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ErrorIfUnsupportedIndexStmt checks if the corresponding index statement is
|
* ErrorIfUnsupportedIndexStmt checks if the corresponding index statement is
|
||||||
* supported for distributed tables and errors out if it is not.
|
* supported for distributed tables and errors out if it is not.
|
||||||
|
|
Loading…
Reference in New Issue