citus/src/backend/distributed/commands/vacuum.c

627 lines
17 KiB
C

/*-------------------------------------------------------------------------
*
* vacuum.c
* Commands for vacuuming distributed tables.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/pg_version_constants.h"
#if PG_VERSION_NUM >= PG_VERSION_12
#include "commands/defrem.h"
#endif
#include "commands/vacuum.h"
#include "distributed/adaptive_executor.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/resource_lock.h"
#include "distributed/transaction_management.h"
#include "distributed/version_compat.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "postmaster/bgworker_internals.h"
#include "access/xact.h"
#define VACUUM_PARALLEL_NOTSET -2
/*
* Subset of VacuumParams we care about
*/
typedef struct CitusVacuumParams
{
int options;
#if PG_VERSION_NUM >= PG_VERSION_12
VacOptTernaryValue truncate;
VacOptTernaryValue index_cleanup;
#endif
#if PG_VERSION_NUM >= PG_VERSION_13
int nworkers;
#endif
} CitusVacuumParams;
/* Local functions forward declarations for processing distributed table commands */
static bool IsDistributedVacuumStmt(int vacuumOptions, List *VacuumCitusRelationIdList);
static List * VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams,
List *vacuumColumnList);
static char * DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams);
static char * DeparseVacuumColumnNames(List *columnNameList);
static List * VacuumColumnList(VacuumStmt *vacuumStmt, int relationIndex);
static List * ExtractVacuumTargetRels(VacuumStmt *vacuumStmt);
static void ExecuteVacuumOnDistributedTables(VacuumStmt *vacuumStmt, List *relationIdList,
CitusVacuumParams vacuumParams);
static CitusVacuumParams VacuumStmtParams(VacuumStmt *vacstmt);
static List * VacuumCitusRelationIdList(VacuumStmt *vacuumStmt, CitusVacuumParams
vacuumParams);
/*
* PostprocessVacuumStmt processes vacuum statements that may need propagation to
* distributed tables. If a VACUUM or ANALYZE command references a distributed
* table, it is propagated to all involved nodes; otherwise, this function will
* immediately exit after some error checking.
*
* Unlike most other Process functions within this file, this function does not
* return a modified parse node, as it is expected that the local VACUUM or
* ANALYZE has already been processed.
*/
void
PostprocessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
{
CitusVacuumParams vacuumParams = VacuumStmtParams(vacuumStmt);
const char *stmtName = (vacuumParams.options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
/*
* No table in the vacuum statement means vacuuming all relations
* which is not supported by citus.
*/
if (list_length(vacuumStmt->rels) == 0)
{
/* WARN for unqualified VACUUM commands */
ereport(WARNING, (errmsg("not propagating %s command to worker nodes", stmtName),
errhint("Provide a specific table in order to %s "
"distributed tables.", stmtName)));
}
List *citusRelationIdList = VacuumCitusRelationIdList(vacuumStmt, vacuumParams);
if (list_length(citusRelationIdList) == 0)
{
return;
}
if (vacuumParams.options & VACOPT_VACUUM)
{
/*
* We commit the current transaction here so that the global lock
* taken from the shell table for VACUUM is released, which would block execution
* of shard placements. We don't do this in case of "ANALYZE <table>" command because
* its semantics are different than VACUUM and it doesn't acquire the global lock.
*/
CommitTransactionCommand();
StartTransactionCommand();
}
/*
* Here we get the relation list again because we might have
* closed the current transaction and the memory context got reset.
* Vacuum's context is PortalContext, which lasts for the whole session
* so committing/starting a new transaction doesn't affect it.
*/
citusRelationIdList = VacuumCitusRelationIdList(vacuumStmt, vacuumParams);
bool distributedVacuumStmt = IsDistributedVacuumStmt(vacuumParams.options,
citusRelationIdList);
if (!distributedVacuumStmt)
{
return;
}
ExecuteVacuumOnDistributedTables(vacuumStmt, citusRelationIdList, vacuumParams);
}
/*
* VacuumCitusRelationIdList returns the oid of the relations in the given vacuum statement.
*/
static List *
VacuumCitusRelationIdList(VacuumStmt *vacuumStmt, CitusVacuumParams vacuumParams)
{
LOCKMODE lockMode = (vacuumParams.options & VACOPT_FULL) ? AccessExclusiveLock :
ShareUpdateExclusiveLock;
List *vacuumRelationList = ExtractVacuumTargetRels(vacuumStmt);
List *relationIdList = NIL;
RangeVar *vacuumRelation = NULL;
foreach_ptr(vacuumRelation, vacuumRelationList)
{
Oid relationId = RangeVarGetRelid(vacuumRelation, lockMode, false);
if (!IsCitusTable(relationId))
{
continue;
}
relationIdList = lappend_oid(relationIdList, relationId);
}
return relationIdList;
}
/*
* ExecuteVacuumOnDistributedTables executes the vacuum for the shard placements of given tables
* if they are citus tables.
*/
static void
ExecuteVacuumOnDistributedTables(VacuumStmt *vacuumStmt, List *relationIdList,
CitusVacuumParams vacuumParams)
{
int relationIndex = 0;
int executedVacuumCount = 0;
Oid relationId = InvalidOid;
foreach_oid(relationId, relationIdList)
{
if (IsCitusTable(relationId))
{
/*
* VACUUM commands cannot run inside a transaction block, so we use
* the "bare" commit protocol without BEGIN/COMMIT. However, ANALYZE
* commands can run inside a transaction block. Notice that we do this
* once even if there are multiple distributed tables to be vacuumed.
*/
if (executedVacuumCount == 0 && (vacuumParams.options & VACOPT_VACUUM) != 0)
{
/* save old commit protocol to restore at xact end */
Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE);
SavedMultiShardCommitProtocol = MultiShardCommitProtocol;
MultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
}
List *vacuumColumnList = VacuumColumnList(vacuumStmt, relationIndex);
List *taskList = VacuumTaskList(relationId, vacuumParams, vacuumColumnList);
/* local execution is not implemented for VACUUM commands */
bool localExecutionSupported = false;
ExecuteUtilityTaskList(taskList, localExecutionSupported);
executedVacuumCount++;
}
relationIndex++;
}
}
/*
* IsSupportedDistributedVacuumStmt returns whether distributed execution of a
* given VacuumStmt is supported. The provided relationId list represents
* the list of tables targeted by the provided statement.
*
* Returns true if the statement requires distributed execution and returns
* false otherwise.
*/
static bool
IsDistributedVacuumStmt(int vacuumOptions, List *VacuumCitusRelationIdList)
{
bool distributeStmt = false;
int distributedRelationCount = 0;
const char *stmtName = (vacuumOptions & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
Oid relationId = InvalidOid;
foreach_oid(relationId, VacuumCitusRelationIdList)
{
if (OidIsValid(relationId) && IsCitusTable(relationId))
{
distributedRelationCount++;
}
}
if (distributedRelationCount == 0)
{
/* nothing to do here */
}
else if (!EnableDDLPropagation)
{
/* WARN if DDL propagation is not enabled */
ereport(WARNING, (errmsg("not propagating %s command to worker nodes", stmtName),
errhint("Set citus.enable_ddl_propagation to true in order to "
"send targeted %s commands to worker nodes.",
stmtName)));
}
else
{
distributeStmt = true;
}
return distributeStmt;
}
/*
* VacuumTaskList returns a list of tasks to be executed as part of processing
* a VacuumStmt which targets a distributed relation.
*/
static List *
VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColumnList)
{
/* resulting task list */
List *taskList = NIL;
/* enumerate the tasks when putting them to the taskList */
int taskId = 1;
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
char *relationName = get_rel_name(relationId);
const char *vacuumStringPrefix = DeparseVacuumStmtPrefix(vacuumParams);
const char *columnNames = DeparseVacuumColumnNames(vacuumColumnList);
/*
* We obtain ShareUpdateExclusiveLock here to not conflict with INSERT's
* RowExclusiveLock. However if VACUUM FULL is used, we already obtain
* AccessExclusiveLock before reaching to that point and INSERT's will be
* blocked anyway. This is inline with PostgreSQL's own behaviour.
*/
LockRelationOid(relationId, ShareUpdateExclusiveLock);
List *shardIntervalList = LoadShardIntervalList(relationId);
/* grab shard lock before getting placement list */
LockShardListMetadata(shardIntervalList, ShareLock);
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
{
uint64 shardId = shardInterval->shardId;
char *shardRelationName = pstrdup(relationName);
/* build shard relation name */
AppendShardIdToName(&shardRelationName, shardId);
char *quotedShardName = quote_qualified_identifier(schemaName, shardRelationName);
/* copy base vacuum string and build the shard specific command */
StringInfo vacuumStringForShard = makeStringInfo();
appendStringInfoString(vacuumStringForShard, vacuumStringPrefix);
appendStringInfoString(vacuumStringForShard, quotedShardName);
appendStringInfoString(vacuumStringForShard, columnNames);
Task *task = CitusMakeNode(Task);
task->jobId = INVALID_JOB_ID;
task->taskId = taskId++;
task->taskType = VACUUM_ANALYZE_TASK;
SetTaskQueryString(task, vacuumStringForShard->data);
task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = shardId;
task->taskPlacementList = ActiveShardPlacementList(shardId);
taskList = lappend(taskList, task);
}
return taskList;
}
/*
* DeparseVacuumStmtPrefix returns a StringInfo appropriate for use as a prefix
* during distributed execution of a VACUUM or ANALYZE statement. Callers may
* reuse this prefix within a loop to generate shard-specific VACUUM or ANALYZE
* statements.
*/
static char *
DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams)
{
int vacuumFlags = vacuumParams.options;
StringInfo vacuumPrefix = makeStringInfo();
/* determine actual command and block out its bits */
if (vacuumFlags & VACOPT_VACUUM)
{
appendStringInfoString(vacuumPrefix, "VACUUM ");
vacuumFlags &= ~VACOPT_VACUUM;
}
else
{
Assert((vacuumFlags & VACOPT_ANALYZE) != 0);
appendStringInfoString(vacuumPrefix, "ANALYZE ");
vacuumFlags &= ~VACOPT_ANALYZE;
if (vacuumFlags & VACOPT_VERBOSE)
{
appendStringInfoString(vacuumPrefix, "VERBOSE ");
vacuumFlags &= ~VACOPT_VERBOSE;
}
}
/* if no flags remain, exit early */
if (vacuumFlags == 0
#if PG_VERSION_NUM >= PG_VERSION_12
&& vacuumParams.truncate == VACOPT_TERNARY_DEFAULT &&
vacuumParams.index_cleanup == VACOPT_TERNARY_DEFAULT
#endif
#if PG_VERSION_NUM >= PG_VERSION_13
&& vacuumParams.nworkers == VACUUM_PARALLEL_NOTSET
#endif
)
{
return vacuumPrefix->data;
}
/* otherwise, handle options */
appendStringInfoChar(vacuumPrefix, '(');
if (vacuumFlags & VACOPT_ANALYZE)
{
appendStringInfoString(vacuumPrefix, "ANALYZE,");
}
if (vacuumFlags & VACOPT_DISABLE_PAGE_SKIPPING)
{
appendStringInfoString(vacuumPrefix, "DISABLE_PAGE_SKIPPING,");
}
if (vacuumFlags & VACOPT_FREEZE)
{
appendStringInfoString(vacuumPrefix, "FREEZE,");
}
if (vacuumFlags & VACOPT_FULL)
{
appendStringInfoString(vacuumPrefix, "FULL,");
}
if (vacuumFlags & VACOPT_VERBOSE)
{
appendStringInfoString(vacuumPrefix, "VERBOSE,");
}
#if PG_VERSION_NUM >= PG_VERSION_12
if (vacuumFlags & VACOPT_SKIP_LOCKED)
{
appendStringInfoString(vacuumPrefix, "SKIP_LOCKED,");
}
if (vacuumParams.truncate != VACOPT_TERNARY_DEFAULT)
{
appendStringInfoString(vacuumPrefix,
vacuumParams.truncate == VACOPT_TERNARY_ENABLED ?
"TRUNCATE," : "TRUNCATE false,"
);
}
if (vacuumParams.index_cleanup != VACOPT_TERNARY_DEFAULT)
{
appendStringInfoString(vacuumPrefix,
vacuumParams.index_cleanup == VACOPT_TERNARY_ENABLED ?
"INDEX_CLEANUP," : "INDEX_CLEANUP false,"
);
}
#endif
#if PG_VERSION_NUM >= PG_VERSION_13
if (vacuumParams.nworkers != VACUUM_PARALLEL_NOTSET)
{
appendStringInfo(vacuumPrefix, "PARALLEL %d,", vacuumParams.nworkers);
}
#endif
vacuumPrefix->data[vacuumPrefix->len - 1] = ')';
appendStringInfoChar(vacuumPrefix, ' ');
return vacuumPrefix->data;
}
/*
* DeparseVacuumColumnNames joins the list of strings using commas as a
* delimiter. The whole thing is placed in parenthesis and set off with a
* single space in order to facilitate appending it to the end of any VACUUM
* or ANALYZE command which uses explicit column names. If the provided list
* is empty, this function returns an empty string to keep the calling code
* simplest.
*/
static char *
DeparseVacuumColumnNames(List *columnNameList)
{
StringInfo columnNames = makeStringInfo();
if (columnNameList == NIL)
{
return columnNames->data;
}
appendStringInfoString(columnNames, " (");
Value *columnName = NULL;
foreach_ptr(columnName, columnNameList)
{
appendStringInfo(columnNames, "%s,", strVal(columnName));
}
columnNames->data[columnNames->len - 1] = ')';
return columnNames->data;
}
/*
* VacuumColumnList returns list of columns from relation
* in the vacuum statement at specified relationIndex.
*/
static List *
VacuumColumnList(VacuumStmt *vacuumStmt, int relationIndex)
{
VacuumRelation *vacuumRelation = (VacuumRelation *) list_nth(vacuumStmt->rels,
relationIndex);
return vacuumRelation->va_cols;
}
/*
* ExtractVacuumTargetRels returns list of target
* relations from vacuum statement.
*/
static List *
ExtractVacuumTargetRels(VacuumStmt *vacuumStmt)
{
List *vacuumList = NIL;
VacuumRelation *vacuumRelation = NULL;
foreach_ptr(vacuumRelation, vacuumStmt->rels)
{
vacuumList = lappend(vacuumList, vacuumRelation->relation);
}
return vacuumList;
}
/*
* VacuumStmtParams returns a CitusVacuumParams based on the supplied VacuumStmt.
*/
#if PG_VERSION_NUM >= PG_VERSION_12
/*
* This is mostly ExecVacuum from Postgres's commands/vacuum.c
* Note that ExecVacuum does an actual vacuum as well and we don't want
* that to happen in the coordinator hence we copied the rest here.
*/
static CitusVacuumParams
VacuumStmtParams(VacuumStmt *vacstmt)
{
CitusVacuumParams params;
bool verbose = false;
bool skip_locked = false;
bool analyze = false;
bool freeze = false;
bool full = false;
bool disable_page_skipping = false;
/* Set default value */
params.index_cleanup = VACOPT_TERNARY_DEFAULT;
params.truncate = VACOPT_TERNARY_DEFAULT;
#if PG_VERSION_NUM >= PG_VERSION_13
params.nworkers = VACUUM_PARALLEL_NOTSET;
#endif
/* Parse options list */
DefElem *opt = NULL;
foreach_ptr(opt, vacstmt->options)
{
/* Parse common options for VACUUM and ANALYZE */
if (strcmp(opt->defname, "verbose") == 0)
{
verbose = defGetBoolean(opt);
}
else if (strcmp(opt->defname, "skip_locked") == 0)
{
skip_locked = defGetBoolean(opt);
}
else if (!vacstmt->is_vacuumcmd)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("unrecognized ANALYZE option \"%s\"", opt->defname)));
}
/* Parse options available on VACUUM */
else if (strcmp(opt->defname, "analyze") == 0)
{
analyze = defGetBoolean(opt);
}
else if (strcmp(opt->defname, "freeze") == 0)
{
freeze = defGetBoolean(opt);
}
else if (strcmp(opt->defname, "full") == 0)
{
full = defGetBoolean(opt);
}
else if (strcmp(opt->defname, "disable_page_skipping") == 0)
{
disable_page_skipping = defGetBoolean(opt);
}
else if (strcmp(opt->defname, "index_cleanup") == 0)
{
params.index_cleanup = defGetBoolean(opt) ? VACOPT_TERNARY_ENABLED :
VACOPT_TERNARY_DISABLED;
}
else if (strcmp(opt->defname, "truncate") == 0)
{
params.truncate = defGetBoolean(opt) ? VACOPT_TERNARY_ENABLED :
VACOPT_TERNARY_DISABLED;
}
#if PG_VERSION_NUM >= PG_VERSION_13
else if (strcmp(opt->defname, "parallel") == 0)
{
if (opt->arg == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("parallel option requires a value between 0 and %d",
MAX_PARALLEL_WORKER_LIMIT)));
}
else
{
int nworkers = defGetInt32(opt);
if (nworkers < 0 || nworkers > MAX_PARALLEL_WORKER_LIMIT)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("parallel vacuum degree must be between 0 and %d",
MAX_PARALLEL_WORKER_LIMIT)));
}
params.nworkers = nworkers;
}
}
#endif
else
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("unrecognized VACUUM option \"%s\"", opt->defname)
));
}
}
params.options = (vacstmt->is_vacuumcmd ? VACOPT_VACUUM : VACOPT_ANALYZE) |
(verbose ? VACOPT_VERBOSE : 0) |
(skip_locked ? VACOPT_SKIP_LOCKED : 0) |
(analyze ? VACOPT_ANALYZE : 0) |
(freeze ? VACOPT_FREEZE : 0) |
(full ? VACOPT_FULL : 0) |
(disable_page_skipping ? VACOPT_DISABLE_PAGE_SKIPPING : 0);
return params;
}
#else
static CitusVacuumParams
VacuumStmtParams(VacuumStmt *vacuumStmt)
{
CitusVacuumParams params;
params.options = vacuumStmt->options;
return params;
}
#endif