Issue worker messages with the same log level

pull/3017/head
Marco Slot 2020-04-10 20:41:26 +02:00
parent 132efdbc56
commit 8b83306a27
24 changed files with 512 additions and 116 deletions

View File

@ -30,6 +30,7 @@
#include "distributed/shard_pruning.h" #include "distributed/shard_pruning.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_log_messages.h"
#include "optimizer/clauses.h" #include "optimizer/clauses.h"
#include "nodes/nodeFuncs.h" #include "nodes/nodeFuncs.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
@ -186,6 +187,8 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
.requires2PC = false .requires2PC = false
}; };
EnableWorkerMessagePropagation();
bool localExecutionSupported = true; bool localExecutionSupported = true;
ExecutionParams *executionParams = CreateBasicExecutionParams( ExecutionParams *executionParams = CreateBasicExecutionParams(
ROW_MODIFY_NONE, list_make1(task), MaxAdaptiveExecutorPoolSize, ROW_MODIFY_NONE, list_make1(task), MaxAdaptiveExecutorPoolSize,
@ -197,6 +200,8 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
executionParams->xactProperties = xactProperties; executionParams->xactProperties = xactProperties;
ExecuteTaskListExtended(executionParams); ExecuteTaskListExtended(executionParams);
DisableWorkerMessagePropagation();
while (tuplestore_gettupleslot(tupleStore, true, false, slot)) while (tuplestore_gettupleslot(tupleStore, true, false, slot))
{ {
if (!dest->receiveSlot(slot, dest)) if (!dest->receiveSlot(slot, dest))

View File

@ -21,6 +21,7 @@
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/errormessage.h" #include "distributed/errormessage.h"
#include "distributed/error_codes.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/log_utils.h" #include "distributed/log_utils.h"
#include "distributed/memutils.h" #include "distributed/memutils.h"
@ -32,6 +33,7 @@
#include "distributed/cancel_utils.h" #include "distributed/cancel_utils.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "distributed/worker_log_messages.h"
#include "mb/pg_wchar.h" #include "mb/pg_wchar.h"
#include "portability/instr_time.h" #include "portability/instr_time.h"
#include "storage/ipc.h" #include "storage/ipc.h"
@ -55,11 +57,11 @@ static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isC
static bool ShouldShutdownConnection(MultiConnection *connection, const int static bool ShouldShutdownConnection(MultiConnection *connection, const int
cachedConnectionCount); cachedConnectionCount);
static void ResetConnection(MultiConnection *connection); static void ResetConnection(MultiConnection *connection);
static void DefaultCitusNoticeProcessor(void *arg, const char *message);
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
static bool RemoteTransactionIdle(MultiConnection *connection); static bool RemoteTransactionIdle(MultiConnection *connection);
static int EventSetSizeForConnectionList(List *connections); static int EventSetSizeForConnectionList(List *connections);
/* types for async connection management */ /* types for async connection management */
enum MultiConnectionPhase enum MultiConnectionPhase
{ {
@ -83,9 +85,6 @@ static void CloseNotReadyMultiConnectionStates(List *connectionStates);
static uint32 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState); static uint32 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState);
static void CitusPQFinish(MultiConnection *connection); static void CitusPQFinish(MultiConnection *connection);
static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL;
/* /*
* Initialize per-backend connection management infrastructure. * Initialize per-backend connection management infrastructure.
*/ */
@ -1108,7 +1107,7 @@ StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key
*/ */
PQsetnonblocking(connection->pgConn, true); PQsetnonblocking(connection->pgConn, true);
SetCitusNoticeProcessor(connection); SetCitusNoticeReceiver(connection);
} }
@ -1281,72 +1280,3 @@ RemoteTransactionIdle(MultiConnection *connection)
return PQtransactionStatus(connection->pgConn) == PQTRANS_IDLE; return PQtransactionStatus(connection->pgConn) == PQTRANS_IDLE;
} }
/*
* SetCitusNoticeProcessor sets the NoticeProcessor to DefaultCitusNoticeProcessor
*/
void
SetCitusNoticeProcessor(MultiConnection *connection)
{
PQsetNoticeProcessor(connection->pgConn, DefaultCitusNoticeProcessor,
connection);
}
/*
* UnsetCitusNoticeLevel sets the CitusNoticeLogLevel back to
* its default value.
*/
void
UnsetCitusNoticeLevel()
{
CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL;
}
/*
* DefaultCitusNoticeProcessor is used to redirect worker notices
* from logfile to console.
*/
static void
DefaultCitusNoticeProcessor(void *arg, const char *message)
{
MultiConnection *connection = (MultiConnection *) arg;
char *nodeName = connection->hostname;
uint32 nodePort = connection->port;
char *trimmedMessage = TrimLogLevel(message);
char *strtokPosition;
char *level = strtok_r((char *) message, ":", &strtokPosition);
ereport(CitusNoticeLogLevel,
(errmsg("%s", ApplyLogRedaction(trimmedMessage)),
errdetail("%s from %s:%d", level, nodeName, nodePort)));
}
/*
* TrimLogLevel returns a copy of the string with the leading log level
* and spaces removed such as
* From:
* INFO: "normal2_102070": scanned 0 of 0 pages...
* To:
* "normal2_102070": scanned 0 of 0 pages...
*/
char *
TrimLogLevel(const char *message)
{
char *chompedMessage = pchomp(message);
size_t n = 0;
while (n < strlen(chompedMessage) && chompedMessage[n] != ':')
{
n++;
}
do {
n++;
} while (n < strlen(chompedMessage) && chompedMessage[n] == ' ');
return chompedMessage + n;
}

View File

@ -0,0 +1,256 @@
/*-------------------------------------------------------------------------
*
* worker_log_messages.c
* Logic for handling log messages from workers.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/connection_management.h"
#include "distributed/error_codes.h"
#include "distributed/errormessage.h"
#include "distributed/log_utils.h"
#include "distributed/worker_log_messages.h"
#include "utils/elog.h"
/*
* WorkerMinMessages reflects the value of the citus.worker_min_messages setting which
* control the minimum log level of messages from the worker that are propagated to the
* client and the log on the coordinator.
*/
int WorkerMinMessages = NOTICE;
/*
* PreserveWorkerMessageLogLevel specifies whether to propagate messages from workers
* to the client and the log on the coordinator with their original log level. When
* false, messages are propagated using DEBUG1.
*
* This flag used to suppress redundant notices in some commands (e.g. VACUUM, DROP
* TABLE).
*/
static bool PreserveWorkerMessageLogLevel = false;
/*
* WorkerErrorIndication can contain a warning that arrives to use from one session, but occurred
* because another session in the same distributed transaction threw an error. We store
* this warning in case we do not get an error, in which case the warning should have
* been an error (and usually indicates a bug).
*/
DeferredErrorMessage *WorkerErrorIndication = NULL;
/* list of log level names we might see from the worker */
static const char *LogLevelNames[] = {
"DEBUG",
"NOTICE",
"INFO",
"WARNING",
"ERROR",
"FATAL",
"PANIC",
NULL
};
/* postgres log level values corresponding to LogLevelNames */
static const int LogLevels[] = {
DEBUG1,
NOTICE,
INFO,
WARNING,
ERROR,
FATAL,
PANIC
};
static void DefaultCitusNoticeReceiver(void *arg, const PGresult *result);
static int LogLevelNameToLogLevel(char *levelName);
static char * TrimLogLevel(const char *message);
/*
* SetCitusNoticeReceiver sets the NoticeReceiver to DefaultCitusNoticeReceivere
*/
void
SetCitusNoticeReceiver(MultiConnection *connection)
{
PQsetNoticeReceiver(connection->pgConn, DefaultCitusNoticeReceiver,
connection);
}
/*
* EnableWorkerMessagePropagation indicates that we want to propagate messages
* from workers to the client using the same log level.
*/
void
EnableWorkerMessagePropagation(void)
{
PreserveWorkerMessageLogLevel = true;
}
/*
* DisableWorkerMessagePropagation indiciates that we want all messages from the
* workers to only be sent to the client as debug messages.
*/
void
DisableWorkerMessagePropagation(void)
{
PreserveWorkerMessageLogLevel = false;
}
/*
* DefaultCitusNoticeReceiver is used to redirect worker notices
* from logfile to console.
*/
static void
DefaultCitusNoticeReceiver(void *arg, const PGresult *result)
{
MultiConnection *connection = (MultiConnection *) arg;
char *nodeName = connection->hostname;
uint32 nodePort = connection->port;
char *message = PQresultErrorMessage(result);
char *trimmedMessage = TrimLogLevel(message);
char *levelName = PQresultErrorField(result, PG_DIAG_SEVERITY);
int logLevel = LogLevelNameToLogLevel(levelName);
int sqlState = ERRCODE_INTERNAL_ERROR;
char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
if (sqlStateString != NULL)
{
sqlState = MAKE_SQLSTATE(sqlStateString[0],
sqlStateString[1],
sqlStateString[2],
sqlStateString[3],
sqlStateString[4]);
}
/*
* When read_intermediate_result cannot find a file it might mean that the
* transaction that created the file already deleted it because it aborted.
* That's an expected situation, unless there is no actual error. We
* therefore store a DeferredError and raise it if we reach the end of
* execution without errors.
*/
if (sqlState == ERRCODE_CITUS_INTERMEDIATE_RESULT_NOT_FOUND && logLevel == WARNING)
{
if (WorkerErrorIndication == NULL)
{
/* we'll at most need this for the lifetime of the transaction */
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
WorkerErrorIndication = DeferredError(sqlState, pstrdup(trimmedMessage),
NULL, NULL);
MemoryContextSwitchTo(oldContext);
}
/* if we get the error we're expecting, the user does not need to know */
logLevel = DEBUG4;
}
if (logLevel < WorkerMinMessages || WorkerMinMessages == CITUS_LOG_LEVEL_OFF)
{
/* user does not want to see message */
return;
}
if (!PreserveWorkerMessageLogLevel)
{
/*
* We sometimes want to suppress notices (e.g. DROP TABLE cascading),
* since the user already gets the relevant notices for the distributed
* table. In that case, we change the log level to DEBUG1.
*/
logLevel = DEBUG1;
}
ereport(logLevel,
(errcode(sqlState),
errmsg("%s", ApplyLogRedaction(trimmedMessage)),
errdetail("from %s:%d", nodeName, nodePort)));
}
/*
* TrimLogLevel returns a copy of the string with the leading log level
* and spaces removed such as
* From:
* INFO: "normal2_102070": scanned 0 of 0 pages...
* To:
* "normal2_102070": scanned 0 of 0 pages...
*/
static char *
TrimLogLevel(const char *message)
{
char *chompedMessage = pchomp(message);
size_t n = 0;
while (n < strlen(chompedMessage) && chompedMessage[n] != ':')
{
n++;
}
do {
n++;
} while (n < strlen(chompedMessage) && chompedMessage[n] == ' ');
return chompedMessage + n;
}
/*
* LogLevelNameToLogLevel translates the prefix of Postgres log messages
* back to a native log level.
*/
static int
LogLevelNameToLogLevel(char *levelName)
{
int levelIndex = 0;
while (LogLevelNames[levelIndex] != NULL)
{
if (strcmp(levelName, LogLevelNames[levelIndex]) == 0)
{
return LogLevels[levelIndex];
}
levelIndex++;
}
return DEBUG1;
}
/*
* ErrorIfWorkerErrorIndicationReceived throws the deferred error in
* WorkerErrorIndication, if any.
*
* A fatal warning arrives to us as a WARNING in one session, that is triggered
* by an ERROR in another session in the same distributed transaction. We therefore
* do not expect to throw it, unless there is a bug in Citus.
*/
void
ErrorIfWorkerErrorIndicationReceived(void)
{
if (WorkerErrorIndication != NULL)
{
RaiseDeferredError(WorkerErrorIndication, ERROR);
}
}
/*
* ResetWorkerErrorIndication resets the fatal warning if one was received.
*/
void
ResetWorkerErrorIndication(void)
{
WorkerErrorIndication = NULL;
}

View File

@ -1591,8 +1591,6 @@ AcquireExecutorShardLocksForExecution(DistributedExecution *execution)
static void static void
FinishDistributedExecution(DistributedExecution *execution) FinishDistributedExecution(DistributedExecution *execution)
{ {
UnsetCitusNoticeLevel();
if (DistributedExecutionModifiesDatabase(execution)) if (DistributedExecutionModifiesDatabase(execution))
{ {
/* prevent copying shards in same transaction */ /* prevent copying shards in same transaction */

View File

@ -19,6 +19,7 @@
#include "distributed/citus_custom_scan.h" #include "distributed/citus_custom_scan.h"
#include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodefuncs.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/connection_management.h"
#include "distributed/deparse_shard_query.h" #include "distributed/deparse_shard_query.h"
#include "distributed/distributed_execution_locks.h" #include "distributed/distributed_execution_locks.h"
#include "distributed/insert_select_executor.h" #include "distributed/insert_select_executor.h"
@ -31,6 +32,7 @@
#include "distributed/multi_router_planner.h" #include "distributed/multi_router_planner.h"
#include "distributed/query_stats.h" #include "distributed/query_stats.h"
#include "distributed/subplan_execution.h" #include "distributed/subplan_execution.h"
#include "distributed/worker_log_messages.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "executor/executor.h" #include "executor/executor.h"
#include "nodes/makefuncs.h" #include "nodes/makefuncs.h"
@ -166,6 +168,12 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
CitusScanState *scanState = (CitusScanState *) node; CitusScanState *scanState = (CitusScanState *) node;
/*
* Make sure we can see notices during regular queries, which would typically
* be the result of a function that raises a notices being called.
*/
EnableWorkerMessagePropagation();
#if PG_VERSION_NUM >= PG_VERSION_12 #if PG_VERSION_NUM >= PG_VERSION_12
/* /*
@ -638,6 +646,15 @@ CitusEndScan(CustomScanState *node)
Const *partitionKeyConst = NULL; Const *partitionKeyConst = NULL;
char *partitionKeyString = NULL; char *partitionKeyString = NULL;
/* stop propagating notices */
DisableWorkerMessagePropagation();
/*
* Check whether we received warnings that should not have been
* ignored.
*/
ErrorIfWorkerErrorIndicationReceived();
if (workerJob != NULL) if (workerJob != NULL)
{ {
partitionKeyConst = workerJob->partitionKeyValue; partitionKeyConst = workerJob->partitionKeyValue;

View File

@ -20,6 +20,7 @@
#include "commands/copy.h" #include "commands/copy.h"
#include "distributed/commands/multi_copy.h" #include "distributed/commands/multi_copy.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/error_codes.h"
#include "distributed/intermediate_results.h" #include "distributed/intermediate_results.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
@ -810,7 +811,7 @@ ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat,
* we could try to read a non-existing file. That is most likely * we could try to read a non-existing file. That is most likely
* to happen during development. * to happen during development.
*/ */
ereport(WARNING, (errcode_for_file_access(), ereport(WARNING, (errcode(ERRCODE_CITUS_INTERMEDIATE_RESULT_NOT_FOUND),
errmsg("Query could not find the intermediate result file " errmsg("Query could not find the intermediate result file "
"\"%s\", it was mostly likely deleted due to an " "\"%s\", it was mostly likely deleted due to an "
"error in a parallel process within the same " "error in a parallel process within the same "

View File

@ -68,7 +68,7 @@
static List *plannerRestrictionContextList = NIL; static List *plannerRestrictionContextList = NIL;
int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log level */ int MultiTaskQueryLogLevel = CITUS_LOG_LEVEL_OFF; /* multi-task query log level */
static uint64 NextPlanId = 1; static uint64 NextPlanId = 1;
/* keep track of planner call stack levels */ /* keep track of planner call stack levels */
@ -1321,7 +1321,7 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
if (IsMultiTaskPlan(distributedPlan)) if (IsMultiTaskPlan(distributedPlan))
{ {
/* if it is not a single task executable plan, inform user according to the log level */ /* if it is not a single task executable plan, inform user according to the log level */
if (MultiTaskQueryLogLevel != MULTI_TASK_QUERY_INFO_OFF) if (MultiTaskQueryLogLevel != CITUS_LOG_LEVEL_OFF)
{ {
ereport(MultiTaskQueryLogLevel, (errmsg( ereport(MultiTaskQueryLogLevel, (errmsg(
"multi-task query about to be executed"), "multi-task query about to be executed"),

View File

@ -67,6 +67,7 @@
#include "distributed/task_tracker.h" #include "distributed/task_tracker.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "distributed/transaction_recovery.h" #include "distributed/transaction_recovery.h"
#include "distributed/worker_log_messages.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "distributed/worker_shard_visibility.h" #include "distributed/worker_shard_visibility.h"
@ -164,10 +165,16 @@ static const struct config_enum_entry shard_commit_protocol_options[] = {
{ NULL, 0, false } { NULL, 0, false }
}; };
static const struct config_enum_entry multi_task_query_log_level_options[] = { static const struct config_enum_entry log_level_options[] = {
{ "off", MULTI_TASK_QUERY_INFO_OFF, false }, { "off", CITUS_LOG_LEVEL_OFF, false },
{ "debug", DEBUG2, false }, { "debug5", DEBUG5, false},
{ "debug4", DEBUG4, false},
{ "debug3", DEBUG3, false},
{ "debug2", DEBUG2, false},
{ "debug1", DEBUG1, false},
{ "debug", DEBUG2, true},
{ "log", LOG, false}, { "log", LOG, false},
{ "info", INFO, true},
{ "notice", NOTICE, false}, { "notice", NOTICE, false},
{ "warning", WARNING, false}, { "warning", WARNING, false},
{ "error", ERROR, false}, { "error", ERROR, false},
@ -624,6 +631,18 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL, GUC_NO_SHOW_ALL,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomEnumVariable(
"citus.worker_min_messages",
gettext_noop("Log messages from workers only if their log level is at or above "
"the configured level"),
NULL,
&WorkerMinMessages,
NOTICE,
log_level_options,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.explain_distributed_queries", "citus.explain_distributed_queries",
gettext_noop("Enables Explain for distributed queries."), gettext_noop("Enables Explain for distributed queries."),
@ -1248,7 +1267,7 @@ RegisterCitusConfigVariables(void)
gettext_noop("Sets the level of multi task query execution log messages"), gettext_noop("Sets the level of multi task query execution log messages"),
NULL, NULL,
&MultiTaskQueryLogLevel, &MultiTaskQueryLogLevel,
MULTI_TASK_QUERY_INFO_OFF, multi_task_query_log_level_options, CITUS_LOG_LEVEL_OFF, log_level_options,
PGC_USERSET, PGC_USERSET,
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);

View File

@ -34,6 +34,7 @@
#include "distributed/shared_connection_stats.h" #include "distributed/shared_connection_stats.h"
#include "distributed/subplan_execution.h" #include "distributed/subplan_execution.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "distributed/worker_log_messages.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
#include "utils/guc.h" #include "utils/guc.h"
#include "utils/memutils.h" #include "utils/memutils.h"
@ -263,6 +264,9 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
case XACT_EVENT_ABORT: case XACT_EVENT_ABORT:
{ {
/* stop propagating notices from workers, we know the query is failed */
DisableWorkerMessagePropagation();
/* /*
* FIXME: Add warning for the COORD_TRANS_COMMITTED case. That * FIXME: Add warning for the COORD_TRANS_COMMITTED case. That
* can be reached if this backend fails after the * can be reached if this backend fails after the
@ -328,7 +332,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
*/ */
SubPlanLevel = 0; SubPlanLevel = 0;
UnSetDistributedTransactionId(); UnSetDistributedTransactionId();
UnsetCitusNoticeLevel();
break; break;
} }
@ -451,6 +454,8 @@ ResetGlobalVariables()
activeSetStmts = NULL; activeSetStmts = NULL;
CoordinatedTransactionUses2PC = false; CoordinatedTransactionUses2PC = false;
TransactionModifiedNodeMetadata = false; TransactionModifiedNodeMetadata = false;
ResetWorkerErrorIndication();
} }
@ -511,13 +516,23 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
case SUBXACT_EVENT_ABORT_SUB: case SUBXACT_EVENT_ABORT_SUB:
{ {
/*
* Stop showing message for now, will re-enable when executing
* the next statement.
*/
DisableWorkerMessagePropagation();
/*
* Given that we aborted, worker error indications can be ignored.
*/
ResetWorkerErrorIndication();
if (InCoordinatedTransaction()) if (InCoordinatedTransaction())
{ {
CoordinatedRemoteTransactionsSavepointRollback(subId); CoordinatedRemoteTransactionsSavepointRollback(subId);
} }
PopSubXact(subId); PopSubXact(subId);
UnsetCitusNoticeLevel();
break; break;
} }

View File

@ -27,9 +27,6 @@
/* used for libpq commands that get an error buffer. Postgres docs recommend 256. */ /* used for libpq commands that get an error buffer. Postgres docs recommend 256. */
#define ERROR_BUFFER_SIZE 256 #define ERROR_BUFFER_SIZE 256
/* default notice level */
#define DEFAULT_CITUS_NOTICE_LEVEL DEBUG1
/* application name used for internal connections in Citus */ /* application name used for internal connections in Citus */
#define CITUS_APPLICATION_NAME "citus" #define CITUS_APPLICATION_NAME "citus"
@ -247,11 +244,6 @@ extern void FinishConnectionEstablishment(MultiConnection *connection);
extern void ClaimConnectionExclusively(MultiConnection *connection); extern void ClaimConnectionExclusively(MultiConnection *connection);
extern void UnclaimConnection(MultiConnection *connection); extern void UnclaimConnection(MultiConnection *connection);
/* dealing with notice handler */
extern void SetCitusNoticeProcessor(MultiConnection *connection);
extern char * TrimLogLevel(const char *message);
extern void UnsetCitusNoticeLevel(void);
/* time utilities */ /* time utilities */
extern double MillisecondsPassedSince(instr_time moment); extern double MillisecondsPassedSince(instr_time moment);
extern long MillisecondsToTimeout(instr_time start, long msAfterStart); extern long MillisecondsToTimeout(instr_time start, long msAfterStart);

View File

@ -30,7 +30,6 @@
/* values used by jobs and tasks which do not require identifiers */ /* values used by jobs and tasks which do not require identifiers */
#define INVALID_JOB_ID 0 #define INVALID_JOB_ID 0
#define INVALID_TASK_ID 0 #define INVALID_TASK_ID 0
#define MULTI_TASK_QUERY_INFO_OFF 0 /* do not log multi-task queries */
#define CURSOR_OPT_FORCE_DISTRIBUTED 0x080000 #define CURSOR_OPT_FORCE_DISTRIBUTED 0x080000

View File

@ -0,0 +1,21 @@
/*-------------------------------------------------------------------------
*
* error_codes.h
* Error codes that are specific to Citus
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef CITUS_ERROR_CODES_H
#define CITUS_ERROR_CODES_H
#include "utils/elog.h"
#define ERRCODE_CITUS_INTERMEDIATE_RESULT_NOT_FOUND MAKE_SQLSTATE('C', 'I', 'I', 'N', 'F')
#endif /* CITUS_ERROR_CODES_H */

View File

@ -12,6 +12,9 @@
#include "utils/guc.h" #include "utils/guc.h"
/* do not log */
#define CITUS_LOG_LEVEL_OFF 0
extern bool IsLoggableLevel(int logLevel); extern bool IsLoggableLevel(int logLevel);
extern char * HashLogMessage(const char *text); extern char * HashLogMessage(const char *text);

View File

@ -0,0 +1,28 @@
/*-------------------------------------------------------------------------
*
* worker_log_messages.h
* Functions for handling log messages from the workers.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef WORKER_LOG_MESSAGES_H
#define WORKER_LOG_MESSAGES_H
#include "distributed/connection_management.h"
/* minimum log level for worker messages to be propagated */
extern int WorkerMinMessages;
void SetCitusNoticeReceiver(MultiConnection *connection);
void EnableWorkerMessagePropagation(void);
void DisableWorkerMessagePropagation(void);
void ErrorIfWorkerErrorIndicationReceived(void);
void ResetWorkerErrorIndication(void);
#endif /* WORKER_LOG_MESSAGES_H */

View File

@ -13,6 +13,48 @@ CREATE SCHEMA function_tests AUTHORIZATION functionuser;
CREATE SCHEMA function_tests2 AUTHORIZATION functionuser; CREATE SCHEMA function_tests2 AUTHORIZATION functionuser;
SET search_path TO function_tests; SET search_path TO function_tests;
SET citus.shard_count TO 4; SET citus.shard_count TO 4;
-- test notice
CREATE TABLE notices (
id int primary key,
message text
);
SELECT create_distributed_table('notices', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO notices VALUES (1, 'hello world');
CREATE FUNCTION notice(text)
RETURNS void
LANGUAGE plpgsql AS $$
BEGIN
RAISE NOTICE '%', $1;
END;
$$;
SELECT create_distributed_function('notice(text)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT notice(message) FROM notices WHERE id = 1;
NOTICE: hello world
DETAIL: from localhost:xxxxx
notice
---------------------------------------------------------------------
(1 row)
-- should not see a NOTICE if worker_min_messages is WARNING
SET citus.worker_min_messages TO WARNING;
SELECT notice(message) FROM notices WHERE id = 1;
notice
---------------------------------------------------------------------
(1 row)
RESET citus.worker_min_messages;
-- Create and distribute a simple function -- Create and distribute a simple function
CREATE FUNCTION eq(macaddr, macaddr) RETURNS bool CREATE FUNCTION eq(macaddr, macaddr) RETURNS bool
AS 'select $1 = $2;' AS 'select $1 = $2;'

View File

@ -94,12 +94,16 @@ FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares',
WHERE user_id = 'jon' OR true WHERE user_id = 'jon' OR true
ORDER BY x; ORDER BY x;
DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Query could not find the intermediate result file "squares", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction ERROR: Query could not find the intermediate result file "squares", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
DETAIL: WARNING from localhost:xxxxx SET client_min_messages TO DEBUG;
x | x2 SELECT x, x2
--------------------------------------------------------------------- FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in)
(0 rows) WHERE user_id = 'jon'
ORDER BY x;
DEBUG: Creating router plan
DEBUG: Plan is router executable
DETAIL: distribution column value: jon
ERROR: Query could not find the intermediate result file "squares", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
RESET client_min_messages; RESET client_min_messages;
-- try to read the file as text, will fail because of binary encoding -- try to read the file as text, will fail because of binary encoding
BEGIN; BEGIN;

View File

@ -5,17 +5,17 @@ SET citus.next_shard_id TO 1570000;
SET citus.replicate_reference_tables_on_activate TO off; SET citus.replicate_reference_tables_on_activate TO off;
SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); SELECT * FROM master_add_node('localhost', :master_port, groupid := 0);
DEBUG: schema "public" already exists, skipping DEBUG: schema "public" already exists, skipping
DETAIL: NOTICE from localhost:xxxxx DETAIL: from localhost:xxxxx
DEBUG: extension "plpgsql" already exists, skipping DEBUG: extension "plpgsql" already exists, skipping
DETAIL: NOTICE from localhost:xxxxx DETAIL: from localhost:xxxxx
DEBUG: schema "citus_mx_test_schema" already exists, skipping DEBUG: schema "citus_mx_test_schema" already exists, skipping
DETAIL: NOTICE from localhost:xxxxx DETAIL: from localhost:xxxxx
DEBUG: schema "citus_mx_test_schema_join_1" already exists, skipping DEBUG: schema "citus_mx_test_schema_join_1" already exists, skipping
DETAIL: NOTICE from localhost:xxxxx DETAIL: from localhost:xxxxx
DEBUG: schema "citus_mx_test_schema_join_2" already exists, skipping DEBUG: schema "citus_mx_test_schema_join_2" already exists, skipping
DETAIL: NOTICE from localhost:xxxxx DETAIL: from localhost:xxxxx
DEBUG: schema "citus_mx_schema_for_xacts" already exists, skipping DEBUG: schema "citus_mx_schema_for_xacts" already exists, skipping
DETAIL: NOTICE from localhost:xxxxx DETAIL: from localhost:xxxxx
master_add_node master_add_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
32 32

View File

@ -180,10 +180,46 @@ SELECT create_distributed_function('stable_squared(int)');
UPDATE example SET value = timestamp '10-10-2000 00:00' UPDATE example SET value = timestamp '10-10-2000 00:00'
FROM (SELECT key, stable_squared(count(*)::int) y FROM example GROUP BY key) a WHERE example.key = a.key; FROM (SELECT key, stable_squared(count(*)::int) y FROM example GROUP BY key) a WHERE example.key = a.key;
NOTICE: stable_fn called
DETAIL: from localhost:xxxxx
NOTICE: stable_fn called
DETAIL: from localhost:xxxxx
NOTICE: stable_fn called
DETAIL: from localhost:xxxxx
NOTICE: stable_fn called
DETAIL: from localhost:xxxxx
NOTICE: stable_fn called
DETAIL: from localhost:xxxxx
NOTICE: stable_fn called
DETAIL: from localhost:xxxxx
UPDATE example SET value = timestamp '10-10-2000 00:00' UPDATE example SET value = timestamp '10-10-2000 00:00'
FROM (SELECT key, stable_squared((count(*) OVER ())::int) y FROM example GROUP BY key) a WHERE example.key = a.key; FROM (SELECT key, stable_squared((count(*) OVER ())::int) y FROM example GROUP BY key) a WHERE example.key = a.key;
NOTICE: stable_fn called
DETAIL: from localhost:xxxxx
NOTICE: stable_fn called
DETAIL: from localhost:xxxxx
NOTICE: stable_fn called
DETAIL: from localhost:xxxxx
NOTICE: stable_fn called
DETAIL: from localhost:xxxxx
NOTICE: stable_fn called
DETAIL: from localhost:xxxxx
NOTICE: stable_fn called
DETAIL: from localhost:xxxxx
UPDATE example SET value = timestamp '10-10-2000 00:00' UPDATE example SET value = timestamp '10-10-2000 00:00'
FROM (SELECT key, stable_squared(grouping(key)) y FROM example GROUP BY key) a WHERE example.key = a.key; FROM (SELECT key, stable_squared(grouping(key)) y FROM example GROUP BY key) a WHERE example.key = a.key;
NOTICE: stable_fn called
DETAIL: from localhost:xxxxx
NOTICE: stable_fn called
DETAIL: from localhost:xxxxx
NOTICE: stable_fn called
DETAIL: from localhost:xxxxx
NOTICE: stable_fn called
DETAIL: from localhost:xxxxx
NOTICE: stable_fn called
DETAIL: from localhost:xxxxx
NOTICE: stable_fn called
DETAIL: from localhost:xxxxx
DROP SCHEMA multi_function_evaluation CASCADE; DROP SCHEMA multi_function_evaluation CASCADE;
NOTICE: drop cascades to 5 other objects NOTICE: drop cascades to 5 other objects
DETAIL: drop cascades to table example DETAIL: drop cascades to table example

View File

@ -385,7 +385,7 @@ DETAIL: A distributed function is created. To make sure subsequent commands see
\set VERBOSITY terse \set VERBOSITY terse
call multi_mx_call.mx_call_proc_raise(2); call multi_mx_call.mx_call_proc_raise(2);
DEBUG: pushing down the procedure DEBUG: pushing down the procedure
DEBUG: warning WARNING: warning
ERROR: error ERROR: error
\set VERBOSITY default \set VERBOSITY default
-- Test that we don't propagate to non-metadata worker nodes -- Test that we don't propagate to non-metadata worker nodes

View File

@ -406,7 +406,7 @@ DETAIL: A distributed function is created. To make sure subsequent commands see
\set VERBOSITY terse \set VERBOSITY terse
select mx_call_func_raise(2); select mx_call_func_raise(2);
DEBUG: pushing down the function call DEBUG: pushing down the function call
DEBUG: warning WARNING: warning
ERROR: error ERROR: error
\set VERBOSITY default \set VERBOSITY default
-- Don't push-down when doing INSERT INTO ... SELECT func(); -- Don't push-down when doing INSERT INTO ... SELECT func();

View File

@ -461,6 +461,7 @@ if ($followercluster)
# shard_count to 4 to speed up the tests. # shard_count to 4 to speed up the tests.
if($isolationtester) if($isolationtester)
{ {
push(@pgOptions, '-c', "citus.worker_min_messages='warning'");
push(@pgOptions, '-c', "citus.log_distributed_deadlock_detection=on"); push(@pgOptions, '-c', "citus.log_distributed_deadlock_detection=on");
push(@pgOptions, '-c', "citus.distributed_deadlock_detection_factor=-1"); push(@pgOptions, '-c', "citus.distributed_deadlock_detection_factor=-1");
push(@pgOptions, '-c', "citus.shard_count=4"); push(@pgOptions, '-c', "citus.shard_count=4");

View File

@ -9,6 +9,29 @@ CREATE SCHEMA function_tests2 AUTHORIZATION functionuser;
SET search_path TO function_tests; SET search_path TO function_tests;
SET citus.shard_count TO 4; SET citus.shard_count TO 4;
-- test notice
CREATE TABLE notices (
id int primary key,
message text
);
SELECT create_distributed_table('notices', 'id');
INSERT INTO notices VALUES (1, 'hello world');
CREATE FUNCTION notice(text)
RETURNS void
LANGUAGE plpgsql AS $$
BEGIN
RAISE NOTICE '%', $1;
END;
$$;
SELECT create_distributed_function('notice(text)');
SELECT notice(message) FROM notices WHERE id = 1;
-- should not see a NOTICE if worker_min_messages is WARNING
SET citus.worker_min_messages TO WARNING;
SELECT notice(message) FROM notices WHERE id = 1;
RESET citus.worker_min_messages;
-- Create and distribute a simple function -- Create and distribute a simple function
CREATE FUNCTION eq(macaddr, macaddr) RETURNS bool CREATE FUNCTION eq(macaddr, macaddr) RETURNS bool
AS 'select $1 = $2;' AS 'select $1 = $2;'

View File

@ -51,6 +51,12 @@ FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares',
WHERE user_id = 'jon' OR true WHERE user_id = 'jon' OR true
ORDER BY x; ORDER BY x;
SET client_min_messages TO DEBUG;
SELECT x, x2
FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in)
WHERE user_id = 'jon'
ORDER BY x;
RESET client_min_messages; RESET client_min_messages;
-- try to read the file as text, will fail because of binary encoding -- try to read the file as text, will fail because of binary encoding
BEGIN; BEGIN;