mirror of https://github.com/citusdata/citus.git
Add targeted VACUUM/ANALYZE support
Adds support for VACUUM and ANALYZE commands which target a specific distributed table. After grabbing the appropriate locks, this imple- mentation sends VACUUM commands to each placement (using one connec- tion per placement). These commands are sent in parallel, so users with large tables will benefit from sharding. Except for VERBOSE, all VACUUM and ANALYZE options are supported, including the explicit column list used by ANALYZE. As with many of our utility commands, the local command also runs. In the VACUUM/ANALYZE case, the local command is executed before any re- mote propagation. Because error handling is managed after local proc- essing, this can result in a VACUUM completing locally but erroring out when distributed processing commences: a minor technicality in all cases, as there isn't really much reason to ever roll back a VACUUM (an impossibility in any case, as VACUUM cannot run within a transaction). Remote propagation of targeted VACUUM/ANALYZE is controlled by the enable_ddl_propagation setting; warnings are emitted if such a command is attempted when DDL propagation is disabled. Unqualified VACUUM or ANALYZE is not handled, but a warning message informs the user of this. Implementation note: this commit adds a "BARE" value to MultiShard- CommitProtocol. When active, no BEGIN command is ever sent to remote nodes, useful for commands such as VACUUM/ANALYZE which must not run in a transaction block. This value is not user-facing and is reset at transaction end.pull/1938/head
parent
54f18a1c88
commit
54c2efccc1
|
@ -46,6 +46,7 @@
|
|||
#include "distributed/multi_utility.h" /* IWYU pragma: keep */
|
||||
#include "distributed/pg_dist_partition.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/transaction_management.h"
|
||||
#include "distributed/transmit.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "executor/executor.h"
|
||||
|
@ -109,6 +110,12 @@ static Node * ProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
|
|||
static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
||||
const char *alterObjectSchemaCommand,
|
||||
bool isTopLevel);
|
||||
static void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);
|
||||
static bool IsSupportedDistributedVacuumStmt(Oid relationId, VacuumStmt *vacuumStmt);
|
||||
static List * VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt);
|
||||
static StringInfo DeparseVacuumStmtPrefix(VacuumStmt *vacuumStmt);
|
||||
static char * DeparseVacuumColumnNames(List *columnNameList);
|
||||
|
||||
|
||||
/* Local functions forward declarations for unsupported command checks */
|
||||
static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement);
|
||||
|
@ -366,6 +373,14 @@ multi_ProcessUtility(Node *parsetree,
|
|||
{
|
||||
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
||||
}
|
||||
|
||||
/* we run VacuumStmt after standard hook to benefit from its checks and locking */
|
||||
if (IsA(parsetree, VacuumStmt))
|
||||
{
|
||||
VacuumStmt *vacuumStmt = (VacuumStmt *) parsetree;
|
||||
|
||||
ProcessVacuumStmt(vacuumStmt, queryString);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -882,6 +897,267 @@ ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ProcessVacuumStmt processes vacuum statements that may need propagation to
|
||||
* distributed tables. If a VACUUM or ANALYZE command references a distributed
|
||||
* table, it is propagated to all involved nodes; otherwise, this function will
|
||||
* immediately exit after some error checking.
|
||||
*
|
||||
* Unlike most other Process functions within this file, this function does not
|
||||
* return a modified parse node, as it is expected that the local VACUUM or
|
||||
* ANALYZE has already been processed.
|
||||
*/
|
||||
static void
|
||||
ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
|
||||
{
|
||||
Oid relationId = InvalidOid;
|
||||
List *taskList = NIL;
|
||||
bool supportedVacuumStmt = false;
|
||||
|
||||
if (vacuumStmt->relation != NULL)
|
||||
{
|
||||
LOCKMODE lockMode = (vacuumStmt->options & VACOPT_FULL) ?
|
||||
AccessExclusiveLock : ShareUpdateExclusiveLock;
|
||||
|
||||
relationId = RangeVarGetRelid(vacuumStmt->relation, lockMode, false);
|
||||
|
||||
if (relationId == InvalidOid)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
supportedVacuumStmt = IsSupportedDistributedVacuumStmt(relationId, vacuumStmt);
|
||||
if (!supportedVacuumStmt)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
taskList = VacuumTaskList(relationId, vacuumStmt);
|
||||
|
||||
/* save old commit protocol to restore at xact end */
|
||||
Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE);
|
||||
SavedMultiShardCommitProtocol = MultiShardCommitProtocol;
|
||||
MultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
|
||||
|
||||
ExecuteModifyTasksWithoutResults(taskList);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsSupportedDistributedVacuumStmt returns whether distributed execution of a
|
||||
* given VacuumStmt is supported. The provided relationId (if valid) represents
|
||||
* the table targeted by the provided statement.
|
||||
*
|
||||
* Returns true if the statement requires distributed execution and returns
|
||||
* false otherwise; however, this function will raise errors if the provided
|
||||
* statement needs distributed execution but contains unsupported options.
|
||||
*/
|
||||
static bool
|
||||
IsSupportedDistributedVacuumStmt(Oid relationId, VacuumStmt *vacuumStmt)
|
||||
{
|
||||
const char *stmtName = (vacuumStmt->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
|
||||
|
||||
if (vacuumStmt->relation == NULL)
|
||||
{
|
||||
/* WARN and exit early for unqualified VACUUM commands */
|
||||
ereport(WARNING, (errmsg("not propagating %s command to worker nodes", stmtName),
|
||||
errhint("Provide a specific table in order to %s "
|
||||
"distributed tables.", stmtName)));
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!OidIsValid(relationId) || !IsDistributedTable(relationId))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!EnableDDLPropagation)
|
||||
{
|
||||
/* WARN and exit early if DDL propagation is not enabled */
|
||||
ereport(WARNING, (errmsg("not propagating %s command to worker nodes", stmtName),
|
||||
errhint("Set citus.enable_ddl_propagation to true in order to "
|
||||
"send targeted %s commands to worker nodes.",
|
||||
stmtName)));
|
||||
}
|
||||
|
||||
if (vacuumStmt->options & VACOPT_VERBOSE)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("the VERBOSE option is currently unsupported in "
|
||||
"distributed %s commands", stmtName)));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* VacuumTaskList returns a list of tasks to be executed as part of processing
|
||||
* a VacuumStmt which targets a distributed relation.
|
||||
*/
|
||||
static List *
|
||||
VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt)
|
||||
{
|
||||
List *taskList = NIL;
|
||||
List *shardIntervalList = NIL;
|
||||
ListCell *shardIntervalCell = NULL;
|
||||
uint64 jobId = INVALID_JOB_ID;
|
||||
int taskId = 1;
|
||||
StringInfo vacuumString = DeparseVacuumStmtPrefix(vacuumStmt);
|
||||
const char *columnNames = DeparseVacuumColumnNames(vacuumStmt->va_cols);
|
||||
const int vacuumPrefixLen = vacuumString->len;
|
||||
Oid schemaId = get_rel_namespace(relationId);
|
||||
char *schemaName = get_namespace_name(schemaId);
|
||||
char *tableName = get_rel_name(relationId);
|
||||
|
||||
/* lock relation metadata before getting shard list */
|
||||
LockRelationDistributionMetadata(relationId, ShareLock);
|
||||
|
||||
shardIntervalList = LoadShardIntervalList(relationId);
|
||||
|
||||
/* grab shard lock before getting placement list */
|
||||
LockShardListMetadata(shardIntervalList, ShareLock);
|
||||
|
||||
foreach(shardIntervalCell, shardIntervalList)
|
||||
{
|
||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
Task *task = NULL;
|
||||
|
||||
char *shardName = pstrdup(tableName);
|
||||
AppendShardIdToName(&shardName, shardInterval->shardId);
|
||||
shardName = quote_qualified_identifier(schemaName, shardName);
|
||||
|
||||
vacuumString->len = vacuumPrefixLen;
|
||||
appendStringInfoString(vacuumString, shardName);
|
||||
appendStringInfoString(vacuumString, columnNames);
|
||||
|
||||
task = CitusMakeNode(Task);
|
||||
task->jobId = jobId;
|
||||
task->taskId = taskId++;
|
||||
task->taskType = SQL_TASK;
|
||||
task->queryString = pstrdup(vacuumString->data);
|
||||
task->dependedTaskList = NULL;
|
||||
task->anchorShardId = shardId;
|
||||
task->taskPlacementList = FinalizedShardPlacementList(shardId);
|
||||
|
||||
taskList = lappend(taskList, task);
|
||||
}
|
||||
|
||||
return taskList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DeparseVacuumStmtPrefix returns a StringInfo appropriate for use as a prefix
|
||||
* during distributed execution of a VACUUM or ANALYZE statement. Callers may
|
||||
* reuse this prefix within a loop to generate shard-specific VACUUM or ANALYZE
|
||||
* statements.
|
||||
*/
|
||||
static StringInfo
|
||||
DeparseVacuumStmtPrefix(VacuumStmt *vacuumStmt)
|
||||
{
|
||||
StringInfo vacuumPrefix = makeStringInfo();
|
||||
int vacuumFlags = vacuumStmt->options;
|
||||
const int unsupportedFlags PG_USED_FOR_ASSERTS_ONLY = ~(
|
||||
VACOPT_ANALYZE |
|
||||
#if (PG_VERSION_NUM >= 90600)
|
||||
VACOPT_DISABLE_PAGE_SKIPPING |
|
||||
#endif
|
||||
VACOPT_FREEZE |
|
||||
VACOPT_FULL
|
||||
);
|
||||
|
||||
/* determine actual command and block out its bit */
|
||||
if (vacuumFlags & VACOPT_VACUUM)
|
||||
{
|
||||
appendStringInfoString(vacuumPrefix, "VACUUM ");
|
||||
vacuumFlags &= ~VACOPT_VACUUM;
|
||||
}
|
||||
else
|
||||
{
|
||||
appendStringInfoString(vacuumPrefix, "ANALYZE ");
|
||||
vacuumFlags &= ~VACOPT_ANALYZE;
|
||||
}
|
||||
|
||||
/* unsupported flags should have already been rejected */
|
||||
Assert((vacuumFlags & unsupportedFlags) == 0);
|
||||
|
||||
/* if no flags remain, exit early */
|
||||
if (vacuumFlags == 0)
|
||||
{
|
||||
return vacuumPrefix;
|
||||
}
|
||||
|
||||
/* otherwise, handle options */
|
||||
appendStringInfoChar(vacuumPrefix, '(');
|
||||
|
||||
if (vacuumFlags & VACOPT_ANALYZE)
|
||||
{
|
||||
appendStringInfoString(vacuumPrefix, "ANALYZE,");
|
||||
}
|
||||
|
||||
#if (PG_VERSION_NUM >= 90600)
|
||||
if (vacuumFlags & VACOPT_DISABLE_PAGE_SKIPPING)
|
||||
{
|
||||
appendStringInfoString(vacuumPrefix, "DISABLE_PAGE_SKIPPING,");
|
||||
}
|
||||
#endif
|
||||
|
||||
if (vacuumFlags & VACOPT_FREEZE)
|
||||
{
|
||||
appendStringInfoString(vacuumPrefix, "FREEZE,");
|
||||
}
|
||||
|
||||
if (vacuumFlags & VACOPT_FULL)
|
||||
{
|
||||
appendStringInfoString(vacuumPrefix, "FULL,");
|
||||
}
|
||||
|
||||
vacuumPrefix->data[vacuumPrefix->len - 1] = ')';
|
||||
|
||||
appendStringInfoChar(vacuumPrefix, ' ');
|
||||
|
||||
return vacuumPrefix;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DeparseVacuumColumnNames joins the list of strings using commas as a
|
||||
* delimiter. The whole thing is placed in parenthesis and set off with a
|
||||
* single space in order to facilitate appending it to the end of any VACUUM
|
||||
* or ANALYZE command which uses explicit column names. If the provided list
|
||||
* is empty, this function returns an empty string to keep the calling code
|
||||
* simplest.
|
||||
*/
|
||||
static char *
|
||||
DeparseVacuumColumnNames(List *columnNameList)
|
||||
{
|
||||
StringInfo columnNames = makeStringInfo();
|
||||
ListCell *columnNameCell = NULL;
|
||||
|
||||
if (columnNameList == NIL)
|
||||
{
|
||||
return columnNames->data;
|
||||
}
|
||||
|
||||
appendStringInfoString(columnNames, " (");
|
||||
|
||||
foreach(columnNameCell, columnNameList)
|
||||
{
|
||||
char *columnName = strVal(lfirst(columnNameCell));
|
||||
|
||||
appendStringInfo(columnNames, "%s,", columnName);
|
||||
}
|
||||
|
||||
columnNames->data[columnNames->len - 1] = ')';
|
||||
|
||||
return columnNames->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfUnsupportedIndexStmt checks if the corresponding index statement is
|
||||
* supported for distributed tables and errors out if it is not.
|
||||
|
|
|
@ -176,8 +176,12 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName)
|
|||
*/
|
||||
MarkRemoteTransactionCritical(connection);
|
||||
|
||||
/* issue BEGIN */
|
||||
RemoteTransactionBegin(connection);
|
||||
/* the special BARE mode (for e.g. VACUUM/ANALYZE) skips BEGIN */
|
||||
if (MultiShardCommitProtocol > COMMIT_PROTOCOL_BARE)
|
||||
{
|
||||
/* issue BEGIN */
|
||||
RemoteTransactionBegin(connection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -270,6 +274,12 @@ ResetShardPlacementTransactionState(void)
|
|||
* round.
|
||||
*/
|
||||
shardConnectionHash = NULL;
|
||||
|
||||
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE)
|
||||
{
|
||||
MultiShardCommitProtocol = SavedMultiShardCommitProtocol;
|
||||
SavedMultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NON
|
|||
|
||||
/* GUC, the commit protocol to use for commands affecting more than one connection */
|
||||
int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC;
|
||||
int SavedMultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
|
||||
|
||||
/* state needed to keep track of operations used during a transaction */
|
||||
XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||
|
|
|
@ -47,13 +47,17 @@ typedef enum CoordinatedTransactionState
|
|||
/* Enumeration that defines the different commit protocols available */
|
||||
typedef enum
|
||||
{
|
||||
COMMIT_PROTOCOL_1PC = 0,
|
||||
COMMIT_PROTOCOL_2PC = 1
|
||||
COMMIT_PROTOCOL_BARE = 0,
|
||||
COMMIT_PROTOCOL_1PC = 1,
|
||||
COMMIT_PROTOCOL_2PC = 2
|
||||
} CommitProtocolType;
|
||||
|
||||
/* config variable managed via guc.c */
|
||||
extern int MultiShardCommitProtocol;
|
||||
|
||||
/* state needed to restore multi-shard commit protocol during VACUUM/ANALYZE */
|
||||
extern int SavedMultiShardCommitProtocol;
|
||||
|
||||
/* state needed to prevent new connections during modifying transactions */
|
||||
extern XactModificationType XactModificationLevel;
|
||||
|
||||
|
|
|
@ -66,3 +66,178 @@ SELECT master_apply_delete_command('DELETE FROM sharded_table');
|
|||
|
||||
-- drop table
|
||||
DROP TABLE sharded_table;
|
||||
-- VACUUM tests
|
||||
-- create a table with a single shard (for convenience)
|
||||
CREATE TABLE dustbunnies (id integer, name text, age integer);
|
||||
SELECT master_create_distributed_table('dustbunnies', 'id', 'hash');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_create_worker_shards('dustbunnies', 1, 2);
|
||||
master_create_worker_shards
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- add some data to the distributed table
|
||||
\copy dustbunnies (id, name) from stdin with csv
|
||||
-- following approach adapted from PostgreSQL's stats.sql file
|
||||
-- save relevant stat counter values in refreshable view
|
||||
\c - - - :worker_1_port
|
||||
CREATE MATERIALIZED VIEW prevcounts AS
|
||||
SELECT analyze_count, vacuum_count FROM pg_stat_user_tables
|
||||
WHERE relname='dustbunnies_990002';
|
||||
-- create function that sleeps until those counters increment
|
||||
create function wait_for_stats() returns void as $$
|
||||
declare
|
||||
start_time timestamptz := clock_timestamp();
|
||||
analyze_updated bool;
|
||||
vacuum_updated bool;
|
||||
begin
|
||||
-- we don't want to wait forever; loop will exit after 10 seconds
|
||||
for i in 1 .. 100 loop
|
||||
|
||||
-- check to see if analyze has been updated
|
||||
SELECT (st.analyze_count >= pc.analyze_count + 1) INTO analyze_updated
|
||||
FROM pg_stat_user_tables AS st, pg_class AS cl, prevcounts AS pc
|
||||
WHERE st.relname='dustbunnies_990002' AND cl.relname='dustbunnies_990002';
|
||||
|
||||
-- check to see if vacuum has been updated
|
||||
SELECT (st.vacuum_count >= pc.vacuum_count + 1) INTO vacuum_updated
|
||||
FROM pg_stat_user_tables AS st, pg_class AS cl, prevcounts AS pc
|
||||
WHERE st.relname='dustbunnies_990002' AND cl.relname='dustbunnies_990002';
|
||||
|
||||
exit when analyze_updated or vacuum_updated;
|
||||
|
||||
-- wait a little
|
||||
perform pg_sleep(0.1);
|
||||
|
||||
-- reset stats snapshot so we can test again
|
||||
perform pg_stat_clear_snapshot();
|
||||
|
||||
end loop;
|
||||
|
||||
-- report time waited in postmaster log (where it won't change test output)
|
||||
raise log 'wait_for_stats delayed % seconds',
|
||||
extract(epoch from clock_timestamp() - start_time);
|
||||
end
|
||||
$$ language plpgsql;
|
||||
-- run VACUUM and ANALYZE against the table on the master
|
||||
\c - - - :master_port
|
||||
VACUUM dustbunnies;
|
||||
ANALYZE dustbunnies;
|
||||
-- verify that the VACUUM and ANALYZE ran
|
||||
\c - - - :worker_1_port
|
||||
SELECT wait_for_stats();
|
||||
wait_for_stats
|
||||
----------------
|
||||
|
||||
(1 row)
|
||||
|
||||
REFRESH MATERIALIZED VIEW prevcounts;
|
||||
SELECT pg_stat_get_vacuum_count('dustbunnies_990002'::regclass);
|
||||
pg_stat_get_vacuum_count
|
||||
--------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT pg_stat_get_analyze_count('dustbunnies_990002'::regclass);
|
||||
pg_stat_get_analyze_count
|
||||
---------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- get file node to verify VACUUM FULL
|
||||
SELECT relfilenode AS oldnode FROM pg_class WHERE oid='dustbunnies_990002'::regclass
|
||||
\gset
|
||||
-- send a VACUUM FULL and a VACUUM ANALYZE
|
||||
\c - - - :master_port
|
||||
VACUUM (FULL) dustbunnies;
|
||||
VACUUM ANALYZE dustbunnies;
|
||||
-- verify that relfilenode changed
|
||||
\c - - - :worker_1_port
|
||||
SELECT relfilenode != :oldnode AS table_rewritten FROM pg_class
|
||||
WHERE oid='dustbunnies_990002'::regclass;
|
||||
table_rewritten
|
||||
-----------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- verify the VACUUM ANALYZE incremented both vacuum and analyze counts
|
||||
SELECT wait_for_stats();
|
||||
wait_for_stats
|
||||
----------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT pg_stat_get_vacuum_count('dustbunnies_990002'::regclass);
|
||||
pg_stat_get_vacuum_count
|
||||
--------------------------
|
||||
2
|
||||
(1 row)
|
||||
|
||||
SELECT pg_stat_get_analyze_count('dustbunnies_990002'::regclass);
|
||||
pg_stat_get_analyze_count
|
||||
---------------------------
|
||||
2
|
||||
(1 row)
|
||||
|
||||
-- disable auto-VACUUM for next test
|
||||
ALTER TABLE dustbunnies_990002 SET (autovacuum_enabled = false);
|
||||
SELECT relfrozenxid AS frozenxid FROM pg_class WHERE oid='dustbunnies_990002'::regclass
|
||||
\gset
|
||||
-- send a VACUUM FREEZE after adding a new row
|
||||
\c - - - :master_port
|
||||
INSERT INTO dustbunnies VALUES (5, 'peter');
|
||||
VACUUM (FREEZE) dustbunnies;
|
||||
-- verify that relfrozenxid increased
|
||||
\c - - - :worker_1_port
|
||||
SELECT relfrozenxid::text::integer > :frozenxid AS frozen_performed FROM pg_class
|
||||
WHERE oid='dustbunnies_990002'::regclass;
|
||||
frozen_performed
|
||||
------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- check there are no nulls in either column
|
||||
SELECT attname, null_frac FROM pg_stats
|
||||
WHERE tablename = 'dustbunnies_990002' ORDER BY attname;
|
||||
attname | null_frac
|
||||
---------+-----------
|
||||
age | 1
|
||||
id | 0
|
||||
name | 0
|
||||
(3 rows)
|
||||
|
||||
-- add NULL values, then perform column-specific ANALYZE
|
||||
\c - - - :master_port
|
||||
INSERT INTO dustbunnies VALUES (6, NULL, NULL);
|
||||
ANALYZE dustbunnies (name);
|
||||
-- verify that name's NULL ratio is updated but age's is not
|
||||
\c - - - :worker_1_port
|
||||
SELECT attname, null_frac FROM pg_stats
|
||||
WHERE tablename = 'dustbunnies_990002' ORDER BY attname;
|
||||
attname | null_frac
|
||||
---------+-----------
|
||||
age | 1
|
||||
id | 0
|
||||
name | 0.166667
|
||||
(3 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
-- verify warning for unqualified VACUUM
|
||||
VACUUM;
|
||||
WARNING: not propagating VACUUM command to worker nodes
|
||||
HINT: Provide a specific table in order to VACUUM distributed tables.
|
||||
-- and warning when using targeted VACUUM without DDL propagation
|
||||
SET citus.enable_ddl_propagation to false;
|
||||
VACUUM dustbunnies;
|
||||
WARNING: not propagating VACUUM command to worker nodes
|
||||
HINT: Set citus.enable_ddl_propagation to true in order to send targeted VACUUM commands to worker nodes.
|
||||
SET citus.enable_ddl_propagation to DEFAULT;
|
||||
-- TODO: support VERBOSE
|
||||
-- VACUUM VERBOSE dustbunnies;
|
||||
-- VACUUM (FULL, VERBOSE) dustbunnies;
|
||||
-- ANALYZE VERBOSE dustbunnies;
|
||||
|
|
|
@ -40,3 +40,136 @@ SELECT master_apply_delete_command('DELETE FROM sharded_table');
|
|||
|
||||
-- drop table
|
||||
DROP TABLE sharded_table;
|
||||
|
||||
-- VACUUM tests
|
||||
|
||||
-- create a table with a single shard (for convenience)
|
||||
CREATE TABLE dustbunnies (id integer, name text, age integer);
|
||||
SELECT master_create_distributed_table('dustbunnies', 'id', 'hash');
|
||||
SELECT master_create_worker_shards('dustbunnies', 1, 2);
|
||||
|
||||
-- add some data to the distributed table
|
||||
\copy dustbunnies (id, name) from stdin with csv
|
||||
1,bugs
|
||||
2,babs
|
||||
3,buster
|
||||
4,roger
|
||||
\.
|
||||
|
||||
-- following approach adapted from PostgreSQL's stats.sql file
|
||||
|
||||
-- save relevant stat counter values in refreshable view
|
||||
\c - - - :worker_1_port
|
||||
CREATE MATERIALIZED VIEW prevcounts AS
|
||||
SELECT analyze_count, vacuum_count FROM pg_stat_user_tables
|
||||
WHERE relname='dustbunnies_990002';
|
||||
|
||||
-- create function that sleeps until those counters increment
|
||||
create function wait_for_stats() returns void as $$
|
||||
declare
|
||||
start_time timestamptz := clock_timestamp();
|
||||
analyze_updated bool;
|
||||
vacuum_updated bool;
|
||||
begin
|
||||
-- we don't want to wait forever; loop will exit after 10 seconds
|
||||
for i in 1 .. 100 loop
|
||||
|
||||
-- check to see if analyze has been updated
|
||||
SELECT (st.analyze_count >= pc.analyze_count + 1) INTO analyze_updated
|
||||
FROM pg_stat_user_tables AS st, pg_class AS cl, prevcounts AS pc
|
||||
WHERE st.relname='dustbunnies_990002' AND cl.relname='dustbunnies_990002';
|
||||
|
||||
-- check to see if vacuum has been updated
|
||||
SELECT (st.vacuum_count >= pc.vacuum_count + 1) INTO vacuum_updated
|
||||
FROM pg_stat_user_tables AS st, pg_class AS cl, prevcounts AS pc
|
||||
WHERE st.relname='dustbunnies_990002' AND cl.relname='dustbunnies_990002';
|
||||
|
||||
exit when analyze_updated or vacuum_updated;
|
||||
|
||||
-- wait a little
|
||||
perform pg_sleep(0.1);
|
||||
|
||||
-- reset stats snapshot so we can test again
|
||||
perform pg_stat_clear_snapshot();
|
||||
|
||||
end loop;
|
||||
|
||||
-- report time waited in postmaster log (where it won't change test output)
|
||||
raise log 'wait_for_stats delayed % seconds',
|
||||
extract(epoch from clock_timestamp() - start_time);
|
||||
end
|
||||
$$ language plpgsql;
|
||||
|
||||
-- run VACUUM and ANALYZE against the table on the master
|
||||
\c - - - :master_port
|
||||
VACUUM dustbunnies;
|
||||
ANALYZE dustbunnies;
|
||||
|
||||
-- verify that the VACUUM and ANALYZE ran
|
||||
\c - - - :worker_1_port
|
||||
SELECT wait_for_stats();
|
||||
REFRESH MATERIALIZED VIEW prevcounts;
|
||||
SELECT pg_stat_get_vacuum_count('dustbunnies_990002'::regclass);
|
||||
SELECT pg_stat_get_analyze_count('dustbunnies_990002'::regclass);
|
||||
|
||||
-- get file node to verify VACUUM FULL
|
||||
SELECT relfilenode AS oldnode FROM pg_class WHERE oid='dustbunnies_990002'::regclass
|
||||
\gset
|
||||
|
||||
-- send a VACUUM FULL and a VACUUM ANALYZE
|
||||
\c - - - :master_port
|
||||
VACUUM (FULL) dustbunnies;
|
||||
VACUUM ANALYZE dustbunnies;
|
||||
|
||||
-- verify that relfilenode changed
|
||||
\c - - - :worker_1_port
|
||||
SELECT relfilenode != :oldnode AS table_rewritten FROM pg_class
|
||||
WHERE oid='dustbunnies_990002'::regclass;
|
||||
|
||||
-- verify the VACUUM ANALYZE incremented both vacuum and analyze counts
|
||||
SELECT wait_for_stats();
|
||||
SELECT pg_stat_get_vacuum_count('dustbunnies_990002'::regclass);
|
||||
SELECT pg_stat_get_analyze_count('dustbunnies_990002'::regclass);
|
||||
|
||||
-- disable auto-VACUUM for next test
|
||||
ALTER TABLE dustbunnies_990002 SET (autovacuum_enabled = false);
|
||||
SELECT relfrozenxid AS frozenxid FROM pg_class WHERE oid='dustbunnies_990002'::regclass
|
||||
\gset
|
||||
|
||||
-- send a VACUUM FREEZE after adding a new row
|
||||
\c - - - :master_port
|
||||
INSERT INTO dustbunnies VALUES (5, 'peter');
|
||||
VACUUM (FREEZE) dustbunnies;
|
||||
|
||||
-- verify that relfrozenxid increased
|
||||
\c - - - :worker_1_port
|
||||
SELECT relfrozenxid::text::integer > :frozenxid AS frozen_performed FROM pg_class
|
||||
WHERE oid='dustbunnies_990002'::regclass;
|
||||
|
||||
-- check there are no nulls in either column
|
||||
SELECT attname, null_frac FROM pg_stats
|
||||
WHERE tablename = 'dustbunnies_990002' ORDER BY attname;
|
||||
|
||||
-- add NULL values, then perform column-specific ANALYZE
|
||||
\c - - - :master_port
|
||||
INSERT INTO dustbunnies VALUES (6, NULL, NULL);
|
||||
ANALYZE dustbunnies (name);
|
||||
|
||||
-- verify that name's NULL ratio is updated but age's is not
|
||||
\c - - - :worker_1_port
|
||||
SELECT attname, null_frac FROM pg_stats
|
||||
WHERE tablename = 'dustbunnies_990002' ORDER BY attname;
|
||||
|
||||
\c - - - :master_port
|
||||
-- verify warning for unqualified VACUUM
|
||||
VACUUM;
|
||||
|
||||
-- and warning when using targeted VACUUM without DDL propagation
|
||||
SET citus.enable_ddl_propagation to false;
|
||||
VACUUM dustbunnies;
|
||||
SET citus.enable_ddl_propagation to DEFAULT;
|
||||
|
||||
-- TODO: support VERBOSE
|
||||
-- VACUUM VERBOSE dustbunnies;
|
||||
-- VACUUM (FULL, VERBOSE) dustbunnies;
|
||||
-- ANALYZE VERBOSE dustbunnies;
|
||||
|
|
Loading…
Reference in New Issue