From 9643ff580ed3960bd73b5e51c0554a6ffce5639e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 8 Aug 2019 21:46:06 +0000 Subject: [PATCH] Update commands/vacuum.c with pg12 changes Adds support for SKIP_LOCKED, INDEX_CLEANUP, TRUNCATE Removes broken assert --- src/backend/distributed/commands/vacuum.c | 195 +++++++++++++++++++--- 1 file changed, 169 insertions(+), 26 deletions(-) diff --git a/src/backend/distributed/commands/vacuum.c b/src/backend/distributed/commands/vacuum.c index 3eb20f58a..e779242e6 100644 --- a/src/backend/distributed/commands/vacuum.c +++ b/src/backend/distributed/commands/vacuum.c @@ -9,8 +9,11 @@ */ #include "postgres.h" -#include "c.h" +#if PG_VERSION_NUM >= 120000 +#include "commands/defrem.h" +#endif +#include "commands/vacuum.h" #include "distributed/commands.h" #include "distributed/commands/utility_hook.h" #include "distributed/metadata_cache.h" @@ -22,13 +25,26 @@ #include "utils/builtins.h" #include "utils/lsyscache.h" +/* + * Subset of VacuumParams we care about + */ +typedef struct CitusVacuumParams +{ + int options; +#if PG_VERSION_NUM >= 120000 + VacOptTernaryValue truncate; + VacOptTernaryValue index_cleanup; +#endif +} CitusVacuumParams; + /* Local functions forward declarations for processing distributed table commands */ -static bool IsDistributedVacuumStmt(VacuumStmt *vacuumStmt, List *vacuumRelationIdList); -static List * VacuumTaskList(Oid relationId, int vacuumOptions, List *vacuumColumnList); -static StringInfo DeparseVacuumStmtPrefix(int vacuumFlags); +static bool IsDistributedVacuumStmt(int vacuumOptions, List *vacuumRelationIdList); +static List * VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, + List *vacuumColumnList); +static StringInfo DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams); static char * DeparseVacuumColumnNames(List *columnNameList); - +static CitusVacuumParams VacuumStmtParams(VacuumStmt *vacstmt); /* * ProcessVacuumStmt processes vacuum statements that may need propagation to @@ -49,7 +65,8 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand) ListCell *vacuumRelationCell = NULL; List *relationIdList = NIL; ListCell *relationIdCell = NULL; - LOCKMODE lockMode = (vacuumStmt->options & VACOPT_FULL) ? AccessExclusiveLock : + CitusVacuumParams vacuumParams = VacuumStmtParams(vacuumStmt); + LOCKMODE lockMode = (vacuumParams.options & VACOPT_FULL) ? AccessExclusiveLock : ShareUpdateExclusiveLock; int executedVacuumCount = 0; @@ -60,7 +77,7 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand) relationIdList = lappend_oid(relationIdList, relationId); } - distributedVacuumStmt = IsDistributedVacuumStmt(vacuumStmt, relationIdList); + distributedVacuumStmt = IsDistributedVacuumStmt(vacuumParams.options, relationIdList); if (!distributedVacuumStmt) { return; @@ -81,7 +98,7 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand) * commands can run inside a transaction block. Notice that we do this * once even if there are multiple distributed tables to be vacuumed. */ - if (executedVacuumCount == 0 && (vacuumStmt->options & VACOPT_VACUUM) != 0) + if (executedVacuumCount == 0 && (vacuumParams.options & VACOPT_VACUUM) != 0) { /* save old commit protocol to restore at xact end */ Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE); @@ -90,7 +107,7 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand) } vacuumColumnList = VacuumColumnList(vacuumStmt, relationIndex); - taskList = VacuumTaskList(relationId, vacuumStmt->options, vacuumColumnList); + taskList = VacuumTaskList(relationId, vacuumParams, vacuumColumnList); /* use adaptive executor when enabled */ ExecuteUtilityTaskListWithoutResults(taskList); @@ -110,9 +127,9 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand) * false otherwise. */ static bool -IsDistributedVacuumStmt(VacuumStmt *vacuumStmt, List *vacuumRelationIdList) +IsDistributedVacuumStmt(int vacuumOptions, List *vacuumRelationIdList) { - const char *stmtName = (vacuumStmt->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE"; + const char *stmtName = (vacuumOptions & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE"; bool distributeStmt = false; ListCell *relationIdCell = NULL; int distributedRelationCount = 0; @@ -166,14 +183,14 @@ IsDistributedVacuumStmt(VacuumStmt *vacuumStmt, List *vacuumRelationIdList) * a VacuumStmt which targets a distributed relation. */ static List * -VacuumTaskList(Oid relationId, int vacuumOptions, List *vacuumColumnList) +VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColumnList) { List *taskList = NIL; List *shardIntervalList = NIL; ListCell *shardIntervalCell = NULL; uint64 jobId = INVALID_JOB_ID; int taskId = 1; - StringInfo vacuumString = DeparseVacuumStmtPrefix(vacuumOptions); + StringInfo vacuumString = DeparseVacuumStmtPrefix(vacuumParams); const char *columnNames = NULL; const int vacuumPrefixLen = vacuumString->len; Oid schemaId = get_rel_namespace(relationId); @@ -233,18 +250,12 @@ VacuumTaskList(Oid relationId, int vacuumOptions, List *vacuumColumnList) * statements. */ static StringInfo -DeparseVacuumStmtPrefix(int vacuumFlags) +DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams) { + int vacuumFlags = vacuumParams.options; StringInfo vacuumPrefix = makeStringInfo(); - const int unsupportedFlags PG_USED_FOR_ASSERTS_ONLY = ~( - VACOPT_ANALYZE | - VACOPT_DISABLE_PAGE_SKIPPING | - VACOPT_FREEZE | - VACOPT_FULL | - VACOPT_VERBOSE - ); - /* determine actual command and block out its bit */ + /* determine actual command and block out its bits */ if (vacuumFlags & VACOPT_VACUUM) { appendStringInfoString(vacuumPrefix, "VACUUM "); @@ -252,6 +263,8 @@ DeparseVacuumStmtPrefix(int vacuumFlags) } else { + Assert((vacuumFlags & VACOPT_ANALYZE) != 0); + appendStringInfoString(vacuumPrefix, "ANALYZE "); vacuumFlags &= ~VACOPT_ANALYZE; @@ -262,11 +275,13 @@ DeparseVacuumStmtPrefix(int vacuumFlags) } } - /* unsupported flags should have already been rejected */ - Assert((vacuumFlags & unsupportedFlags) == 0); - /* if no flags remain, exit early */ - if (vacuumFlags == 0) + if (vacuumFlags == 0 +#if PG_VERSION_NUM >= 120000 + && vacuumParams.truncate == VACOPT_TERNARY_DEFAULT && + vacuumParams.index_cleanup == VACOPT_TERNARY_DEFAULT +#endif + ) { return vacuumPrefix; } @@ -299,6 +314,29 @@ DeparseVacuumStmtPrefix(int vacuumFlags) appendStringInfoString(vacuumPrefix, "VERBOSE,"); } +#if PG_VERSION_NUM >= 120000 + if (vacuumFlags & VACOPT_SKIP_LOCKED) + { + appendStringInfoString(vacuumPrefix, "SKIP_LOCKED,"); + } + + if (vacuumParams.truncate != VACOPT_TERNARY_DEFAULT) + { + appendStringInfoString(vacuumPrefix, + vacuumParams.truncate == VACOPT_TERNARY_ENABLED ? + "TRUNCATE," : "TRUNCATE false," + ); + } + + if (vacuumParams.index_cleanup != VACOPT_TERNARY_DEFAULT) + { + appendStringInfoString(vacuumPrefix, + vacuumParams.index_cleanup == VACOPT_TERNARY_ENABLED ? + "INDEX_CLEANUP," : "INDEX_CLEANUP false," + ); + } +#endif + vacuumPrefix->data[vacuumPrefix->len - 1] = ')'; appendStringInfoChar(vacuumPrefix, ' '); @@ -339,3 +377,108 @@ DeparseVacuumColumnNames(List *columnNameList) return columnNames->data; } + + +/* + * VacuumStmtParams returns a CitusVacuumParams based on the supplied VacuumStmt. + */ +#if PG_VERSION_NUM >= 120000 + +/* + * This is mostly ExecVacuum from Postgres's commands/vacuum.c + */ +static CitusVacuumParams +VacuumStmtParams(VacuumStmt *vacstmt) +{ + CitusVacuumParams params; + bool verbose = false; + bool skip_locked = false; + bool analyze = false; + bool freeze = false; + bool full = false; + bool disable_page_skipping = false; + ListCell *lc; + + /* Set default value */ + params.index_cleanup = VACOPT_TERNARY_DEFAULT; + params.truncate = VACOPT_TERNARY_DEFAULT; + + /* Parse options list */ + foreach(lc, vacstmt->options) + { + DefElem *opt = (DefElem *) lfirst(lc); + + /* Parse common options for VACUUM and ANALYZE */ + if (strcmp(opt->defname, "verbose") == 0) + { + verbose = defGetBoolean(opt); + } + else if (strcmp(opt->defname, "skip_locked") == 0) + { + skip_locked = defGetBoolean(opt); + } + else if (!vacstmt->is_vacuumcmd) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("unrecognized ANALYZE option \"%s\"", opt->defname))); + } + + /* Parse options available on VACUUM */ + else if (strcmp(opt->defname, "analyze") == 0) + { + analyze = defGetBoolean(opt); + } + else if (strcmp(opt->defname, "freeze") == 0) + { + freeze = defGetBoolean(opt); + } + else if (strcmp(opt->defname, "full") == 0) + { + full = defGetBoolean(opt); + } + else if (strcmp(opt->defname, "disable_page_skipping") == 0) + { + disable_page_skipping = defGetBoolean(opt); + } + else if (strcmp(opt->defname, "index_cleanup") == 0) + { + params.index_cleanup = defGetBoolean(opt) ? VACOPT_TERNARY_ENABLED : + VACOPT_TERNARY_DISABLED; + } + else if (strcmp(opt->defname, "truncate") == 0) + { + params.truncate = defGetBoolean(opt) ? VACOPT_TERNARY_ENABLED : + VACOPT_TERNARY_DISABLED; + } + else + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("unrecognized VACUUM option \"%s\"", opt->defname) + )); + } + } + + params.options = (vacstmt->is_vacuumcmd ? VACOPT_VACUUM : VACOPT_ANALYZE) | + (verbose ? VACOPT_VERBOSE : 0) | + (skip_locked ? VACOPT_SKIP_LOCKED : 0) | + (analyze ? VACOPT_ANALYZE : 0) | + (freeze ? VACOPT_FREEZE : 0) | + (full ? VACOPT_FULL : 0) | + (disable_page_skipping ? VACOPT_DISABLE_PAGE_SKIPPING : 0); + return params; +} + + +#else +static CitusVacuumParams +VacuumStmtParams(VacuumStmt *vacuumStmt) +{ + CitusVacuumParams params; + params.options = vacuumStmt->options; + return params; +} + + +#endif