Update commands/vacuum.c with pg12 changes

Adds support for SKIP_LOCKED, INDEX_CLEANUP, TRUNCATE
Removes broken assert
pull/2844/head
Philip Dubé 2019-08-08 21:46:06 +00:00
parent 68c4b71f93
commit 9643ff580e
1 changed files with 169 additions and 26 deletions

View File

@ -9,8 +9,11 @@
*/
#include "postgres.h"
#include "c.h"
#if PG_VERSION_NUM >= 120000
#include "commands/defrem.h"
#endif
#include "commands/vacuum.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/metadata_cache.h"
@ -22,13 +25,26 @@
#include "utils/builtins.h"
#include "utils/lsyscache.h"
/*
* Subset of VacuumParams we care about
*/
typedef struct CitusVacuumParams
{
int options;
#if PG_VERSION_NUM >= 120000
VacOptTernaryValue truncate;
VacOptTernaryValue index_cleanup;
#endif
} CitusVacuumParams;
/* Local functions forward declarations for processing distributed table commands */
static bool IsDistributedVacuumStmt(VacuumStmt *vacuumStmt, List *vacuumRelationIdList);
static List * VacuumTaskList(Oid relationId, int vacuumOptions, List *vacuumColumnList);
static StringInfo DeparseVacuumStmtPrefix(int vacuumFlags);
static bool IsDistributedVacuumStmt(int vacuumOptions, List *vacuumRelationIdList);
static List * VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams,
List *vacuumColumnList);
static StringInfo DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams);
static char * DeparseVacuumColumnNames(List *columnNameList);
static CitusVacuumParams VacuumStmtParams(VacuumStmt *vacstmt);
/*
* ProcessVacuumStmt processes vacuum statements that may need propagation to
@ -49,7 +65,8 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
ListCell *vacuumRelationCell = NULL;
List *relationIdList = NIL;
ListCell *relationIdCell = NULL;
LOCKMODE lockMode = (vacuumStmt->options & VACOPT_FULL) ? AccessExclusiveLock :
CitusVacuumParams vacuumParams = VacuumStmtParams(vacuumStmt);
LOCKMODE lockMode = (vacuumParams.options & VACOPT_FULL) ? AccessExclusiveLock :
ShareUpdateExclusiveLock;
int executedVacuumCount = 0;
@ -60,7 +77,7 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
relationIdList = lappend_oid(relationIdList, relationId);
}
distributedVacuumStmt = IsDistributedVacuumStmt(vacuumStmt, relationIdList);
distributedVacuumStmt = IsDistributedVacuumStmt(vacuumParams.options, relationIdList);
if (!distributedVacuumStmt)
{
return;
@ -81,7 +98,7 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
* commands can run inside a transaction block. Notice that we do this
* once even if there are multiple distributed tables to be vacuumed.
*/
if (executedVacuumCount == 0 && (vacuumStmt->options & VACOPT_VACUUM) != 0)
if (executedVacuumCount == 0 && (vacuumParams.options & VACOPT_VACUUM) != 0)
{
/* save old commit protocol to restore at xact end */
Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE);
@ -90,7 +107,7 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
}
vacuumColumnList = VacuumColumnList(vacuumStmt, relationIndex);
taskList = VacuumTaskList(relationId, vacuumStmt->options, vacuumColumnList);
taskList = VacuumTaskList(relationId, vacuumParams, vacuumColumnList);
/* use adaptive executor when enabled */
ExecuteUtilityTaskListWithoutResults(taskList);
@ -110,9 +127,9 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
* false otherwise.
*/
static bool
IsDistributedVacuumStmt(VacuumStmt *vacuumStmt, List *vacuumRelationIdList)
IsDistributedVacuumStmt(int vacuumOptions, List *vacuumRelationIdList)
{
const char *stmtName = (vacuumStmt->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
const char *stmtName = (vacuumOptions & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
bool distributeStmt = false;
ListCell *relationIdCell = NULL;
int distributedRelationCount = 0;
@ -166,14 +183,14 @@ IsDistributedVacuumStmt(VacuumStmt *vacuumStmt, List *vacuumRelationIdList)
* a VacuumStmt which targets a distributed relation.
*/
static List *
VacuumTaskList(Oid relationId, int vacuumOptions, List *vacuumColumnList)
VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColumnList)
{
List *taskList = NIL;
List *shardIntervalList = NIL;
ListCell *shardIntervalCell = NULL;
uint64 jobId = INVALID_JOB_ID;
int taskId = 1;
StringInfo vacuumString = DeparseVacuumStmtPrefix(vacuumOptions);
StringInfo vacuumString = DeparseVacuumStmtPrefix(vacuumParams);
const char *columnNames = NULL;
const int vacuumPrefixLen = vacuumString->len;
Oid schemaId = get_rel_namespace(relationId);
@ -233,18 +250,12 @@ VacuumTaskList(Oid relationId, int vacuumOptions, List *vacuumColumnList)
* statements.
*/
static StringInfo
DeparseVacuumStmtPrefix(int vacuumFlags)
DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams)
{
int vacuumFlags = vacuumParams.options;
StringInfo vacuumPrefix = makeStringInfo();
const int unsupportedFlags PG_USED_FOR_ASSERTS_ONLY = ~(
VACOPT_ANALYZE |
VACOPT_DISABLE_PAGE_SKIPPING |
VACOPT_FREEZE |
VACOPT_FULL |
VACOPT_VERBOSE
);
/* determine actual command and block out its bit */
/* determine actual command and block out its bits */
if (vacuumFlags & VACOPT_VACUUM)
{
appendStringInfoString(vacuumPrefix, "VACUUM ");
@ -252,6 +263,8 @@ DeparseVacuumStmtPrefix(int vacuumFlags)
}
else
{
Assert((vacuumFlags & VACOPT_ANALYZE) != 0);
appendStringInfoString(vacuumPrefix, "ANALYZE ");
vacuumFlags &= ~VACOPT_ANALYZE;
@ -262,11 +275,13 @@ DeparseVacuumStmtPrefix(int vacuumFlags)
}
}
/* unsupported flags should have already been rejected */
Assert((vacuumFlags & unsupportedFlags) == 0);
/* if no flags remain, exit early */
if (vacuumFlags == 0)
if (vacuumFlags == 0
#if PG_VERSION_NUM >= 120000
&& vacuumParams.truncate == VACOPT_TERNARY_DEFAULT &&
vacuumParams.index_cleanup == VACOPT_TERNARY_DEFAULT
#endif
)
{
return vacuumPrefix;
}
@ -299,6 +314,29 @@ DeparseVacuumStmtPrefix(int vacuumFlags)
appendStringInfoString(vacuumPrefix, "VERBOSE,");
}
#if PG_VERSION_NUM >= 120000
if (vacuumFlags & VACOPT_SKIP_LOCKED)
{
appendStringInfoString(vacuumPrefix, "SKIP_LOCKED,");
}
if (vacuumParams.truncate != VACOPT_TERNARY_DEFAULT)
{
appendStringInfoString(vacuumPrefix,
vacuumParams.truncate == VACOPT_TERNARY_ENABLED ?
"TRUNCATE," : "TRUNCATE false,"
);
}
if (vacuumParams.index_cleanup != VACOPT_TERNARY_DEFAULT)
{
appendStringInfoString(vacuumPrefix,
vacuumParams.index_cleanup == VACOPT_TERNARY_ENABLED ?
"INDEX_CLEANUP," : "INDEX_CLEANUP false,"
);
}
#endif
vacuumPrefix->data[vacuumPrefix->len - 1] = ')';
appendStringInfoChar(vacuumPrefix, ' ');
@ -339,3 +377,108 @@ DeparseVacuumColumnNames(List *columnNameList)
return columnNames->data;
}
/*
* VacuumStmtParams returns a CitusVacuumParams based on the supplied VacuumStmt.
*/
#if PG_VERSION_NUM >= 120000
/*
* This is mostly ExecVacuum from Postgres's commands/vacuum.c
*/
static CitusVacuumParams
VacuumStmtParams(VacuumStmt *vacstmt)
{
CitusVacuumParams params;
bool verbose = false;
bool skip_locked = false;
bool analyze = false;
bool freeze = false;
bool full = false;
bool disable_page_skipping = false;
ListCell *lc;
/* Set default value */
params.index_cleanup = VACOPT_TERNARY_DEFAULT;
params.truncate = VACOPT_TERNARY_DEFAULT;
/* Parse options list */
foreach(lc, vacstmt->options)
{
DefElem *opt = (DefElem *) lfirst(lc);
/* Parse common options for VACUUM and ANALYZE */
if (strcmp(opt->defname, "verbose") == 0)
{
verbose = defGetBoolean(opt);
}
else if (strcmp(opt->defname, "skip_locked") == 0)
{
skip_locked = defGetBoolean(opt);
}
else if (!vacstmt->is_vacuumcmd)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("unrecognized ANALYZE option \"%s\"", opt->defname)));
}
/* Parse options available on VACUUM */
else if (strcmp(opt->defname, "analyze") == 0)
{
analyze = defGetBoolean(opt);
}
else if (strcmp(opt->defname, "freeze") == 0)
{
freeze = defGetBoolean(opt);
}
else if (strcmp(opt->defname, "full") == 0)
{
full = defGetBoolean(opt);
}
else if (strcmp(opt->defname, "disable_page_skipping") == 0)
{
disable_page_skipping = defGetBoolean(opt);
}
else if (strcmp(opt->defname, "index_cleanup") == 0)
{
params.index_cleanup = defGetBoolean(opt) ? VACOPT_TERNARY_ENABLED :
VACOPT_TERNARY_DISABLED;
}
else if (strcmp(opt->defname, "truncate") == 0)
{
params.truncate = defGetBoolean(opt) ? VACOPT_TERNARY_ENABLED :
VACOPT_TERNARY_DISABLED;
}
else
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("unrecognized VACUUM option \"%s\"", opt->defname)
));
}
}
params.options = (vacstmt->is_vacuumcmd ? VACOPT_VACUUM : VACOPT_ANALYZE) |
(verbose ? VACOPT_VERBOSE : 0) |
(skip_locked ? VACOPT_SKIP_LOCKED : 0) |
(analyze ? VACOPT_ANALYZE : 0) |
(freeze ? VACOPT_FREEZE : 0) |
(full ? VACOPT_FULL : 0) |
(disable_page_skipping ? VACOPT_DISABLE_PAGE_SKIPPING : 0);
return params;
}
#else
static CitusVacuumParams
VacuumStmtParams(VacuumStmt *vacuumStmt)
{
CitusVacuumParams params;
params.options = vacuumStmt->options;
return params;
}
#endif