notice handler is implemented

pull/2121/head
mehmet furkan şahin 2018-04-24 10:43:02 +03:00
parent 304b3a41ba
commit a4153c6ab1
10 changed files with 129 additions and 24 deletions

View File

@ -25,6 +25,7 @@
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/hash_helpers.h" #include "distributed/hash_helpers.h"
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
#include "distributed/remote_commands.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "mb/pg_wchar.h" #include "mb/pg_wchar.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
@ -41,9 +42,13 @@ static uint32 ConnectionHashHash(const void *key, Size keysize);
static int ConnectionHashCompare(const void *a, const void *b, Size keysize); static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key); static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static void DefaultCitusNoticeProcessor(void *arg, const char *message);
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL;
/* /*
* Initialize per-backend connection management infrastructure. * Initialize per-backend connection management infrastructure.
*/ */
@ -663,6 +668,8 @@ StartConnectionEstablishment(ConnectionHashKey *key)
*/ */
PQsetnonblocking(connection->pgConn, true); PQsetnonblocking(connection->pgConn, true);
SetCitusNoticeProcessor(connection);
return connection; return connection;
} }
@ -766,3 +773,83 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
} }
} }
} }
/*
* SetCitusNoticeProcessor sets the NoticeProcessor to DefaultCitusNoticeProcessor
*/
void
SetCitusNoticeProcessor(MultiConnection *connection)
{
PQsetNoticeProcessor(connection->pgConn, DefaultCitusNoticeProcessor,
connection);
}
/*
* SetCitusNoticeLevel is used to set the notice level for distributed
* queries.
*/
void
SetCitusNoticeLevel(int level)
{
CitusNoticeLogLevel = level;
}
/*
* UnsetCitusNoticeLevel sets the CitusNoticeLogLevel back to
* its default value.
*/
void
UnsetCitusNoticeLevel()
{
CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL;
}
/*
* DefaultCitusNoticeProcessor is used to redirect worker notices
* from logfile to console.
*/
static void
DefaultCitusNoticeProcessor(void *arg, const char *message)
{
MultiConnection *connection = (MultiConnection *) arg;
char *nodeName = connection->hostname;
uint32 nodePort = connection->port;
char *chompedMessage = pchomp(message);
char *trimmedMessage = TrimLogLevel(chompedMessage);
char *level = strtok((char *) message, ":");
ereport(CitusNoticeLogLevel, (errmsg("%s", trimmedMessage),
errdetail("%s from %s:%d",
level, nodeName, nodePort)));
}
/*
* TrimLogLevel makes a copy of the string with the leading log level
* and spaces removed such as
* From:
* INFO: "normal2_102070": scanned 0 of 0 pages...
* To:
* "normal2_102070": scanned 0 of 0 pages...
*/
char *
TrimLogLevel(const char *message)
{
size_t n;
n = 0;
while (n < strlen(message) && message[n] != ':')
{
n++;
}
do {
n++;
} while (n < strlen(message) && message[n] == ' ');
return pstrdup(message + n);
}

View File

@ -1092,7 +1092,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
CoordinatedTransactionUse2PC(); CoordinatedTransactionUse2PC();
} }
if (firstTask->taskType == DDL_TASK) if (firstTask->taskType == DDL_TASK || firstTask->taskType == VACUUM_ANALYZE_TASK)
{ {
connectionFlags = FOR_DDL; connectionFlags = FOR_DDL;
} }
@ -1173,6 +1173,15 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
connection = (MultiConnection *) list_nth(connectionList, placementIndex); connection = (MultiConnection *) list_nth(connectionList, placementIndex);
/*
* if the task is a VACUUM or ANALYZE, we set CitusNoticeLogLevel to INFO
* to see the logs in console.
*/
if (task->taskType == VACUUM_ANALYZE_TASK)
{
SetCitusNoticeLevel(INFO);
}
/* /*
* If caller is interested, store query results the first time * If caller is interested, store query results the first time
* through. The output of the query's execution on other shards is * through. The output of the query's execution on other shards is
@ -1235,6 +1244,9 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
placementIndex++; placementIndex++;
} }
/* we should set the log level back to its default value since the task is done */
UnsetCitusNoticeLevel();
UnclaimAllShardConnections(shardConnectionHash); UnclaimAllShardConnections(shardConnectionHash);
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();

View File

@ -1718,12 +1718,6 @@ IsSupportedDistributedVacuumStmt(VacuumStmt *vacuumStmt, List *vacuumRelationIdL
"send targeted %s commands to worker nodes.", "send targeted %s commands to worker nodes.",
stmtName))); stmtName)));
} }
else if (vacuumStmt->options & VACOPT_VERBOSE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("the VERBOSE option is currently unsupported in "
"distributed %s commands", stmtName)));
}
else else
{ {
distributeStmt = true; distributeStmt = true;
@ -1784,7 +1778,7 @@ VacuumTaskList(Oid relationId, int vacuumOptions, List *vacuumColumnList)
task = CitusMakeNode(Task); task = CitusMakeNode(Task);
task->jobId = jobId; task->jobId = jobId;
task->taskId = taskId++; task->taskId = taskId++;
task->taskType = DDL_TASK; task->taskType = VACUUM_ANALYZE_TASK;
task->queryString = pstrdup(vacuumString->data); task->queryString = pstrdup(vacuumString->data);
task->dependedTaskList = NULL; task->dependedTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID; task->replicationModel = REPLICATION_MODEL_INVALID;
@ -1812,7 +1806,8 @@ DeparseVacuumStmtPrefix(int vacuumFlags)
VACOPT_ANALYZE | VACOPT_ANALYZE |
VACOPT_DISABLE_PAGE_SKIPPING | VACOPT_DISABLE_PAGE_SKIPPING |
VACOPT_FREEZE | VACOPT_FREEZE |
VACOPT_FULL VACOPT_FULL |
VACOPT_VERBOSE
); );
/* determine actual command and block out its bit */ /* determine actual command and block out its bit */
@ -1825,6 +1820,12 @@ DeparseVacuumStmtPrefix(int vacuumFlags)
{ {
appendStringInfoString(vacuumPrefix, "ANALYZE "); appendStringInfoString(vacuumPrefix, "ANALYZE ");
vacuumFlags &= ~VACOPT_ANALYZE; vacuumFlags &= ~VACOPT_ANALYZE;
if (vacuumFlags & VACOPT_VERBOSE)
{
appendStringInfoString(vacuumPrefix, "VERBOSE ");
vacuumFlags &= ~VACOPT_VERBOSE;
}
} }
/* unsupported flags should have already been rejected */ /* unsupported flags should have already been rejected */
@ -1859,6 +1860,11 @@ DeparseVacuumStmtPrefix(int vacuumFlags)
appendStringInfoString(vacuumPrefix, "FULL,"); appendStringInfoString(vacuumPrefix, "FULL,");
} }
if (vacuumFlags & VACOPT_VERBOSE)
{
appendStringInfoString(vacuumPrefix, "VERBOSE,");
}
vacuumPrefix->data[vacuumPrefix->len - 1] = ')'; vacuumPrefix->data[vacuumPrefix->len - 1] = ')';
appendStringInfoChar(vacuumPrefix, ' '); appendStringInfoChar(vacuumPrefix, ' ');

View File

@ -81,7 +81,7 @@ OpenTransactionsForAllTasks(List *taskList, int connectionFlags)
else else
{ {
/* can only open connections for DDL and DML commands */ /* can only open connections for DDL and DML commands */
Assert(task->taskType == DDL_TASK); Assert(task->taskType == DDL_TASK || VACUUM_ANALYZE_TASK);
accessType = PLACEMENT_ACCESS_DDL; accessType = PLACEMENT_ACCESS_DDL;
} }

View File

@ -237,6 +237,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
*/ */
SubPlanLevel = 0; SubPlanLevel = 0;
UnSetDistributedTransactionId(); UnSetDistributedTransactionId();
UnsetCitusNoticeLevel();
break; break;
} }
@ -352,6 +353,8 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
{ {
CoordinatedRemoteTransactionsSavepointRollback(subId); CoordinatedRemoteTransactionsSavepointRollback(subId);
} }
UnsetCitusNoticeLevel();
break; break;
} }

View File

@ -21,6 +21,9 @@
/* maximum (textual) lengths of hostname and port */ /* maximum (textual) lengths of hostname and port */
#define MAX_NODE_LENGTH 255 /* includes 0 byte */ #define MAX_NODE_LENGTH 255 /* includes 0 byte */
/* default notice level */
#define DEFAULT_CITUS_NOTICE_LEVEL DEBUG1
/* forward declare, to avoid forcing large headers on everyone */ /* forward declare, to avoid forcing large headers on everyone */
struct pg_conn; /* target of the PGconn typedef */ struct pg_conn; /* target of the PGconn typedef */
struct MemoryContextData; struct MemoryContextData;
@ -165,5 +168,11 @@ extern void FinishConnectionEstablishment(MultiConnection *connection);
extern void ClaimConnectionExclusively(MultiConnection *connection); extern void ClaimConnectionExclusively(MultiConnection *connection);
extern void UnclaimConnection(MultiConnection *connection); extern void UnclaimConnection(MultiConnection *connection);
/* dealing with notice handler */
extern void SetCitusNoticeProcessor(MultiConnection *connection);
extern void SetCitusNoticeLevel(int level);
extern char * TrimLogLevel(const char *message);
extern void UnsetCitusNoticeLevel(void);
#endif /* CONNECTION_MANAGMENT_H */ #endif /* CONNECTION_MANAGMENT_H */

View File

@ -84,7 +84,8 @@ typedef enum
MERGE_FETCH_TASK = 5, MERGE_FETCH_TASK = 5,
MODIFY_TASK = 6, MODIFY_TASK = 6,
ROUTER_TASK = 7, ROUTER_TASK = 7,
DDL_TASK = 8 DDL_TASK = 8,
VACUUM_ANALYZE_TASK = 9
} TaskType; } TaskType;

View File

@ -469,7 +469,3 @@ SELECT 1 FROM citus_create_restore_point('regression-test');
1 1
(1 row) (1 row)
-- TODO: support VERBOSE
-- VACUUM VERBOSE dustbunnies;
-- VACUUM (FULL, VERBOSE) dustbunnies;
-- ANALYZE VERBOSE dustbunnies;

View File

@ -472,7 +472,3 @@ SELECT 1 FROM citus_create_restore_point('regression-test');
1 1
(1 row) (1 row)
-- TODO: support VERBOSE
-- VACUUM VERBOSE dustbunnies;
-- VACUUM (FULL, VERBOSE) dustbunnies;
-- ANALYZE VERBOSE dustbunnies;

View File

@ -292,8 +292,3 @@ SELECT citus_truncate_trigger();
-- confirm that citus_create_restore_point works -- confirm that citus_create_restore_point works
SELECT 1 FROM citus_create_restore_point('regression-test'); SELECT 1 FROM citus_create_restore_point('regression-test');
-- TODO: support VERBOSE
-- VACUUM VERBOSE dustbunnies;
-- VACUUM (FULL, VERBOSE) dustbunnies;
-- ANALYZE VERBOSE dustbunnies;