diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index fc5fb0f5b..c04fdb895 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -111,7 +111,9 @@ static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSch bool isTopLevel); static Node * ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand, bool isTopLevel); -static List * VacuumTaskList(Oid relationId, const char *commandString); +static List * VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt); +static StringInfo DeparseVacuumStmtPrefix(VacuumStmt *vacuumStmt); + /* Local functions forward declarations for unsupported command checks */ static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement); @@ -916,7 +918,7 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand, bool isTopL return (Node *) vacuumStmt; } - taskList = VacuumTaskList(relationId, vacuumCommand); + taskList = VacuumTaskList(relationId, vacuumStmt); return (Node *) vacuumStmt; } @@ -924,14 +926,47 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand, bool isTopL /* TODO: Write function comments */ static List * -VacuumTaskList(Oid relationId, const char *commandString) +VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt) { List *taskList = NIL; List *shardIntervalList = LoadShardIntervalList(relationId); + ListCell *shardIntervalCell = NULL; + uint64 jobId = INVALID_JOB_ID; + int taskId = 1; + StringInfo vacuumString = DeparseVacuumStmtPrefix(vacuumStmt); + const int vacuumPrefixLen = vacuumString->len; + Oid schemaId = get_rel_namespace(relationId); + char *schemaName = get_namespace_name(schemaId); + char *tableName = get_rel_name(relationId); /* lock metadata before getting placement lists */ LockShardListMetadata(shardIntervalList, ExclusiveLock); + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + uint64 shardId = shardInterval->shardId; + Task *task = NULL; + + char *shardName = pstrdup(tableName); + AppendShardIdToName(&shardName, shardInterval->shardId); + shardName = quote_qualified_identifier(schemaName, shardName); + + vacuumString->len = vacuumPrefixLen; + appendStringInfoString(vacuumString, shardName); + + task = CitusMakeNode(Task); + task->jobId = jobId; + task->taskId = taskId++; + task->taskType = SQL_TASK; + task->queryString = pstrdup(vacuumString->data); + task->dependedTaskList = NULL; + task->anchorShardId = shardId; + task->taskPlacementList = FinalizedShardPlacementList(shardId); + + taskList = lappend(taskList, task); + } + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("VACUUM of distributed tables is not supported"))); @@ -939,6 +974,54 @@ VacuumTaskList(Oid relationId, const char *commandString) } +/* TODO: Write function comments */ +static StringInfo +DeparseVacuumStmtPrefix(VacuumStmt *vacuumStmt) +{ + StringInfo vacuumPrefix = makeStringInfo(); + const int supportedFlags = ( +#if (PG_VERSION_NUM >= 90600) + VACOPT_DISABLE_PAGE_SKIPPING | +#endif + VACOPT_FREEZE | + VACOPT_FULL + ); + const int vacuumFlags = vacuumStmt->options; + + appendStringInfoString(vacuumPrefix, "VACUUM "); + + if (!(vacuumFlags & supportedFlags)) + { + return vacuumPrefix; + } + + appendStringInfoChar(vacuumPrefix, '('); + + if (vacuumFlags & VACOPT_FREEZE) + { + appendStringInfoString(vacuumPrefix, "FREEZE,"); + } + + if (vacuumFlags & VACOPT_FULL) + { + appendStringInfoString(vacuumPrefix, "FULL,"); + } + +#if (PG_VERSION_NUM >= 90600) + if (vacuumFlags & VACOPT_DISABLE_PAGE_SKIPPING) + { + appendStringInfoString(vacuumPrefix, "DISABLE_PAGE_SKIPPING,"); + } +#endif + + vacuumPrefix->data[vacuumPrefix->len - 1] = ')'; + + appendStringInfoChar(vacuumPrefix, ' '); + + return vacuumPrefix; +} + + /* * ErrorIfUnsupportedIndexStmt checks if the corresponding index statement is * supported for distributed tables and errors out if it is not.