mirror of https://github.com/citusdata/citus.git
Merge pull request #2121 from citusdata/vacuum_analyze_verbose_support
Add support for (VACUUM | ANALYZE) VERBOSEpull/2107/head
commit
ca2818b569
|
@ -25,6 +25,7 @@
|
|||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/hash_helpers.h"
|
||||
#include "distributed/placement_connection.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "mb/pg_wchar.h"
|
||||
#include "utils/hsearch.h"
|
||||
|
@ -41,9 +42,13 @@ static uint32 ConnectionHashHash(const void *key, Size keysize);
|
|||
static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
|
||||
static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key);
|
||||
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
|
||||
static void DefaultCitusNoticeProcessor(void *arg, const char *message);
|
||||
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
|
||||
|
||||
|
||||
static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL;
|
||||
|
||||
|
||||
/*
|
||||
* Initialize per-backend connection management infrastructure.
|
||||
*/
|
||||
|
@ -663,6 +668,8 @@ StartConnectionEstablishment(ConnectionHashKey *key)
|
|||
*/
|
||||
PQsetnonblocking(connection->pgConn, true);
|
||||
|
||||
SetCitusNoticeProcessor(connection);
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
|
@ -766,3 +773,83 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetCitusNoticeProcessor sets the NoticeProcessor to DefaultCitusNoticeProcessor
|
||||
*/
|
||||
void
|
||||
SetCitusNoticeProcessor(MultiConnection *connection)
|
||||
{
|
||||
PQsetNoticeProcessor(connection->pgConn, DefaultCitusNoticeProcessor,
|
||||
connection);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetCitusNoticeLevel is used to set the notice level for distributed
|
||||
* queries.
|
||||
*/
|
||||
void
|
||||
SetCitusNoticeLevel(int level)
|
||||
{
|
||||
CitusNoticeLogLevel = level;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UnsetCitusNoticeLevel sets the CitusNoticeLogLevel back to
|
||||
* its default value.
|
||||
*/
|
||||
void
|
||||
UnsetCitusNoticeLevel()
|
||||
{
|
||||
CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DefaultCitusNoticeProcessor is used to redirect worker notices
|
||||
* from logfile to console.
|
||||
*/
|
||||
static void
|
||||
DefaultCitusNoticeProcessor(void *arg, const char *message)
|
||||
{
|
||||
MultiConnection *connection = (MultiConnection *) arg;
|
||||
char *nodeName = connection->hostname;
|
||||
uint32 nodePort = connection->port;
|
||||
char *chompedMessage = pchomp(message);
|
||||
char *trimmedMessage = TrimLogLevel(chompedMessage);
|
||||
char *level = strtok((char *) message, ":");
|
||||
|
||||
ereport(CitusNoticeLogLevel, (errmsg("%s", trimmedMessage),
|
||||
errdetail("%s from %s:%d",
|
||||
level, nodeName, nodePort)));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TrimLogLevel makes a copy of the string with the leading log level
|
||||
* and spaces removed such as
|
||||
* From:
|
||||
* INFO: "normal2_102070": scanned 0 of 0 pages...
|
||||
* To:
|
||||
* "normal2_102070": scanned 0 of 0 pages...
|
||||
*/
|
||||
char *
|
||||
TrimLogLevel(const char *message)
|
||||
{
|
||||
size_t n;
|
||||
|
||||
n = 0;
|
||||
while (n < strlen(message) && message[n] != ':')
|
||||
{
|
||||
n++;
|
||||
}
|
||||
|
||||
do {
|
||||
n++;
|
||||
} while (n < strlen(message) && message[n] == ' ');
|
||||
|
||||
return pstrdup(message + n);
|
||||
}
|
||||
|
|
|
@ -1092,7 +1092,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
|||
CoordinatedTransactionUse2PC();
|
||||
}
|
||||
|
||||
if (firstTask->taskType == DDL_TASK)
|
||||
if (firstTask->taskType == DDL_TASK || firstTask->taskType == VACUUM_ANALYZE_TASK)
|
||||
{
|
||||
connectionFlags = FOR_DDL;
|
||||
}
|
||||
|
@ -1173,6 +1173,15 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
|||
|
||||
connection = (MultiConnection *) list_nth(connectionList, placementIndex);
|
||||
|
||||
/*
|
||||
* if the task is a VACUUM or ANALYZE, we set CitusNoticeLogLevel to INFO
|
||||
* to see the logs in console.
|
||||
*/
|
||||
if (task->taskType == VACUUM_ANALYZE_TASK)
|
||||
{
|
||||
SetCitusNoticeLevel(INFO);
|
||||
}
|
||||
|
||||
/*
|
||||
* If caller is interested, store query results the first time
|
||||
* through. The output of the query's execution on other shards is
|
||||
|
@ -1235,6 +1244,9 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
|||
placementIndex++;
|
||||
}
|
||||
|
||||
/* we should set the log level back to its default value since the task is done */
|
||||
UnsetCitusNoticeLevel();
|
||||
|
||||
UnclaimAllShardConnections(shardConnectionHash);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
|
|
@ -128,8 +128,7 @@ static Node * WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
|
|||
static List * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
||||
const char *alterObjectSchemaCommand);
|
||||
static void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);
|
||||
static bool IsSupportedDistributedVacuumStmt(VacuumStmt *vacuumStmt,
|
||||
List *vacuumRelationIdList);
|
||||
static bool IsDistributedVacuumStmt(VacuumStmt *vacuumStmt, List *vacuumRelationIdList);
|
||||
static List * VacuumTaskList(Oid relationId, int vacuumOptions, List *vacuumColumnList);
|
||||
static StringInfo DeparseVacuumStmtPrefix(int vacuumFlags);
|
||||
static char * DeparseVacuumColumnNames(List *columnNameList);
|
||||
|
@ -1609,7 +1608,7 @@ static void
|
|||
ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
|
||||
{
|
||||
int relationIndex = 0;
|
||||
bool supportedVacuumStmt = false;
|
||||
bool distributedVacuumStmt = false;
|
||||
List *vacuumRelationList = ExtractVacuumTargetRels(vacuumStmt);
|
||||
ListCell *vacuumRelationCell = NULL;
|
||||
List *relationIdList = NIL;
|
||||
|
@ -1625,8 +1624,8 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
|
|||
relationIdList = lappend_oid(relationIdList, relationId);
|
||||
}
|
||||
|
||||
supportedVacuumStmt = IsSupportedDistributedVacuumStmt(vacuumStmt, relationIdList);
|
||||
if (!supportedVacuumStmt)
|
||||
distributedVacuumStmt = IsDistributedVacuumStmt(vacuumStmt, relationIdList);
|
||||
if (!distributedVacuumStmt)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
@ -1672,11 +1671,10 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
|
|||
* the list of tables targeted by the provided statement.
|
||||
*
|
||||
* Returns true if the statement requires distributed execution and returns
|
||||
* false otherwise; however, this function will raise errors if the provided
|
||||
* statement needs distributed execution but contains unsupported options.
|
||||
* false otherwise.
|
||||
*/
|
||||
static bool
|
||||
IsSupportedDistributedVacuumStmt(VacuumStmt *vacuumStmt, List *vacuumRelationIdList)
|
||||
IsDistributedVacuumStmt(VacuumStmt *vacuumStmt, List *vacuumRelationIdList)
|
||||
{
|
||||
const char *stmtName = (vacuumStmt->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
|
||||
bool distributeStmt = false;
|
||||
|
@ -1718,12 +1716,6 @@ IsSupportedDistributedVacuumStmt(VacuumStmt *vacuumStmt, List *vacuumRelationIdL
|
|||
"send targeted %s commands to worker nodes.",
|
||||
stmtName)));
|
||||
}
|
||||
else if (vacuumStmt->options & VACOPT_VERBOSE)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("the VERBOSE option is currently unsupported in "
|
||||
"distributed %s commands", stmtName)));
|
||||
}
|
||||
else
|
||||
{
|
||||
distributeStmt = true;
|
||||
|
@ -1784,7 +1776,7 @@ VacuumTaskList(Oid relationId, int vacuumOptions, List *vacuumColumnList)
|
|||
task = CitusMakeNode(Task);
|
||||
task->jobId = jobId;
|
||||
task->taskId = taskId++;
|
||||
task->taskType = DDL_TASK;
|
||||
task->taskType = VACUUM_ANALYZE_TASK;
|
||||
task->queryString = pstrdup(vacuumString->data);
|
||||
task->dependedTaskList = NULL;
|
||||
task->replicationModel = REPLICATION_MODEL_INVALID;
|
||||
|
@ -1812,7 +1804,8 @@ DeparseVacuumStmtPrefix(int vacuumFlags)
|
|||
VACOPT_ANALYZE |
|
||||
VACOPT_DISABLE_PAGE_SKIPPING |
|
||||
VACOPT_FREEZE |
|
||||
VACOPT_FULL
|
||||
VACOPT_FULL |
|
||||
VACOPT_VERBOSE
|
||||
);
|
||||
|
||||
/* determine actual command and block out its bit */
|
||||
|
@ -1825,6 +1818,12 @@ DeparseVacuumStmtPrefix(int vacuumFlags)
|
|||
{
|
||||
appendStringInfoString(vacuumPrefix, "ANALYZE ");
|
||||
vacuumFlags &= ~VACOPT_ANALYZE;
|
||||
|
||||
if (vacuumFlags & VACOPT_VERBOSE)
|
||||
{
|
||||
appendStringInfoString(vacuumPrefix, "VERBOSE ");
|
||||
vacuumFlags &= ~VACOPT_VERBOSE;
|
||||
}
|
||||
}
|
||||
|
||||
/* unsupported flags should have already been rejected */
|
||||
|
@ -1859,6 +1858,11 @@ DeparseVacuumStmtPrefix(int vacuumFlags)
|
|||
appendStringInfoString(vacuumPrefix, "FULL,");
|
||||
}
|
||||
|
||||
if (vacuumFlags & VACOPT_VERBOSE)
|
||||
{
|
||||
appendStringInfoString(vacuumPrefix, "VERBOSE,");
|
||||
}
|
||||
|
||||
vacuumPrefix->data[vacuumPrefix->len - 1] = ')';
|
||||
|
||||
appendStringInfoChar(vacuumPrefix, ' ');
|
||||
|
|
|
@ -81,7 +81,7 @@ OpenTransactionsForAllTasks(List *taskList, int connectionFlags)
|
|||
else
|
||||
{
|
||||
/* can only open connections for DDL and DML commands */
|
||||
Assert(task->taskType == DDL_TASK);
|
||||
Assert(task->taskType == DDL_TASK || VACUUM_ANALYZE_TASK);
|
||||
|
||||
accessType = PLACEMENT_ACCESS_DDL;
|
||||
}
|
||||
|
|
|
@ -237,6 +237,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
|||
*/
|
||||
SubPlanLevel = 0;
|
||||
UnSetDistributedTransactionId();
|
||||
UnsetCitusNoticeLevel();
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -352,6 +353,8 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
|
|||
{
|
||||
CoordinatedRemoteTransactionsSavepointRollback(subId);
|
||||
}
|
||||
|
||||
UnsetCitusNoticeLevel();
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,9 @@
|
|||
/* maximum (textual) lengths of hostname and port */
|
||||
#define MAX_NODE_LENGTH 255 /* includes 0 byte */
|
||||
|
||||
/* default notice level */
|
||||
#define DEFAULT_CITUS_NOTICE_LEVEL DEBUG1
|
||||
|
||||
/* forward declare, to avoid forcing large headers on everyone */
|
||||
struct pg_conn; /* target of the PGconn typedef */
|
||||
struct MemoryContextData;
|
||||
|
@ -165,5 +168,11 @@ extern void FinishConnectionEstablishment(MultiConnection *connection);
|
|||
extern void ClaimConnectionExclusively(MultiConnection *connection);
|
||||
extern void UnclaimConnection(MultiConnection *connection);
|
||||
|
||||
/* dealing with notice handler */
|
||||
extern void SetCitusNoticeProcessor(MultiConnection *connection);
|
||||
extern void SetCitusNoticeLevel(int level);
|
||||
extern char * TrimLogLevel(const char *message);
|
||||
extern void UnsetCitusNoticeLevel(void);
|
||||
|
||||
|
||||
#endif /* CONNECTION_MANAGMENT_H */
|
||||
|
|
|
@ -84,7 +84,8 @@ typedef enum
|
|||
MERGE_FETCH_TASK = 5,
|
||||
MODIFY_TASK = 6,
|
||||
ROUTER_TASK = 7,
|
||||
DDL_TASK = 8
|
||||
DDL_TASK = 8,
|
||||
VACUUM_ANALYZE_TASK = 9
|
||||
} TaskType;
|
||||
|
||||
|
||||
|
|
|
@ -469,7 +469,3 @@ SELECT 1 FROM citus_create_restore_point('regression-test');
|
|||
1
|
||||
(1 row)
|
||||
|
||||
-- TODO: support VERBOSE
|
||||
-- VACUUM VERBOSE dustbunnies;
|
||||
-- VACUUM (FULL, VERBOSE) dustbunnies;
|
||||
-- ANALYZE VERBOSE dustbunnies;
|
||||
|
|
|
@ -472,7 +472,3 @@ SELECT 1 FROM citus_create_restore_point('regression-test');
|
|||
1
|
||||
(1 row)
|
||||
|
||||
-- TODO: support VERBOSE
|
||||
-- VACUUM VERBOSE dustbunnies;
|
||||
-- VACUUM (FULL, VERBOSE) dustbunnies;
|
||||
-- ANALYZE VERBOSE dustbunnies;
|
||||
|
|
|
@ -292,8 +292,3 @@ SELECT citus_truncate_trigger();
|
|||
|
||||
-- confirm that citus_create_restore_point works
|
||||
SELECT 1 FROM citus_create_restore_point('regression-test');
|
||||
|
||||
-- TODO: support VERBOSE
|
||||
-- VACUUM VERBOSE dustbunnies;
|
||||
-- VACUUM (FULL, VERBOSE) dustbunnies;
|
||||
-- ANALYZE VERBOSE dustbunnies;
|
||||
|
|
Loading…
Reference in New Issue