From 8b83306a27cb516e4c5580bf4f041895dc8a682e Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 10 Apr 2020 20:41:26 +0200 Subject: [PATCH] Issue worker messages with the same log level --- src/backend/distributed/commands/call.c | 5 + .../connection/connection_management.c | 78 +----- .../connection/worker_log_messages.c | 256 ++++++++++++++++++ .../distributed/executor/adaptive_executor.c | 2 - .../distributed/executor/citus_custom_scan.c | 17 ++ .../executor/intermediate_results.c | 3 +- .../distributed/planner/distributed_planner.c | 4 +- src/backend/distributed/shared_library_init.c | 37 ++- .../transaction/transaction_management.c | 19 +- .../distributed/connection_management.h | 8 - src/include/distributed/distributed_planner.h | 1 - src/include/distributed/error_codes.h | 21 ++ src/include/distributed/log_utils.h | 3 + src/include/distributed/worker_log_messages.h | 28 ++ .../expected/distributed_functions.out | 42 +++ .../regress/expected/intermediate_results.out | 16 +- .../regress/expected/local_shard_copy.out | 12 +- .../expected/multi_function_evaluation.out | 36 +++ .../expected/multi_modifying_xacts.out | 6 +- src/test/regress/expected/multi_mx_call.out | 2 +- .../multi_mx_function_call_delegation.out | 2 +- src/test/regress/pg_regress_multi.pl | 1 + .../regress/sql/distributed_functions.sql | 23 ++ src/test/regress/sql/intermediate_results.sql | 6 + 24 files changed, 512 insertions(+), 116 deletions(-) create mode 100644 src/backend/distributed/connection/worker_log_messages.c create mode 100644 src/include/distributed/error_codes.h create mode 100644 src/include/distributed/worker_log_messages.h diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index d50988f8e..fde187ded 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -30,6 +30,7 @@ #include "distributed/shard_pruning.h" #include "distributed/version_compat.h" #include "distributed/worker_manager.h" +#include "distributed/worker_log_messages.h" #include "optimizer/clauses.h" #include "nodes/nodeFuncs.h" #include "nodes/parsenodes.h" @@ -186,6 +187,8 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, .requires2PC = false }; + EnableWorkerMessagePropagation(); + bool localExecutionSupported = true; ExecutionParams *executionParams = CreateBasicExecutionParams( ROW_MODIFY_NONE, list_make1(task), MaxAdaptiveExecutorPoolSize, @@ -197,6 +200,8 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, executionParams->xactProperties = xactProperties; ExecuteTaskListExtended(executionParams); + DisableWorkerMessagePropagation(); + while (tuplestore_gettupleslot(tupleStore, true, false, slot)) { if (!dest->receiveSlot(slot, dest)) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index fcdceea8b..8495f5646 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -21,6 +21,7 @@ #include "commands/dbcommands.h" #include "distributed/connection_management.h" #include "distributed/errormessage.h" +#include "distributed/error_codes.h" #include "distributed/listutils.h" #include "distributed/log_utils.h" #include "distributed/memutils.h" @@ -32,6 +33,7 @@ #include "distributed/cancel_utils.h" #include "distributed/remote_commands.h" #include "distributed/version_compat.h" +#include "distributed/worker_log_messages.h" #include "mb/pg_wchar.h" #include "portability/instr_time.h" #include "storage/ipc.h" @@ -55,11 +57,11 @@ static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isC static bool ShouldShutdownConnection(MultiConnection *connection, const int cachedConnectionCount); static void ResetConnection(MultiConnection *connection); -static void DefaultCitusNoticeProcessor(void *arg, const char *message); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); static bool RemoteTransactionIdle(MultiConnection *connection); static int EventSetSizeForConnectionList(List *connections); + /* types for async connection management */ enum MultiConnectionPhase { @@ -83,9 +85,6 @@ static void CloseNotReadyMultiConnectionStates(List *connectionStates); static uint32 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState); static void CitusPQFinish(MultiConnection *connection); -static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL; - - /* * Initialize per-backend connection management infrastructure. */ @@ -1108,7 +1107,7 @@ StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key */ PQsetnonblocking(connection->pgConn, true); - SetCitusNoticeProcessor(connection); + SetCitusNoticeReceiver(connection); } @@ -1281,72 +1280,3 @@ RemoteTransactionIdle(MultiConnection *connection) return PQtransactionStatus(connection->pgConn) == PQTRANS_IDLE; } - - -/* - * SetCitusNoticeProcessor sets the NoticeProcessor to DefaultCitusNoticeProcessor - */ -void -SetCitusNoticeProcessor(MultiConnection *connection) -{ - PQsetNoticeProcessor(connection->pgConn, DefaultCitusNoticeProcessor, - connection); -} - - -/* - * UnsetCitusNoticeLevel sets the CitusNoticeLogLevel back to - * its default value. - */ -void -UnsetCitusNoticeLevel() -{ - CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL; -} - - -/* - * DefaultCitusNoticeProcessor is used to redirect worker notices - * from logfile to console. - */ -static void -DefaultCitusNoticeProcessor(void *arg, const char *message) -{ - MultiConnection *connection = (MultiConnection *) arg; - char *nodeName = connection->hostname; - uint32 nodePort = connection->port; - char *trimmedMessage = TrimLogLevel(message); - char *strtokPosition; - char *level = strtok_r((char *) message, ":", &strtokPosition); - - ereport(CitusNoticeLogLevel, - (errmsg("%s", ApplyLogRedaction(trimmedMessage)), - errdetail("%s from %s:%d", level, nodeName, nodePort))); -} - - -/* - * TrimLogLevel returns a copy of the string with the leading log level - * and spaces removed such as - * From: - * INFO: "normal2_102070": scanned 0 of 0 pages... - * To: - * "normal2_102070": scanned 0 of 0 pages... - */ -char * -TrimLogLevel(const char *message) -{ - char *chompedMessage = pchomp(message); - - size_t n = 0; - while (n < strlen(chompedMessage) && chompedMessage[n] != ':') - { - n++; - } - - do { - n++; - } while (n < strlen(chompedMessage) && chompedMessage[n] == ' '); - - return chompedMessage + n; -} diff --git a/src/backend/distributed/connection/worker_log_messages.c b/src/backend/distributed/connection/worker_log_messages.c new file mode 100644 index 000000000..b822f5e83 --- /dev/null +++ b/src/backend/distributed/connection/worker_log_messages.c @@ -0,0 +1,256 @@ +/*------------------------------------------------------------------------- + * + * worker_log_messages.c + * Logic for handling log messages from workers. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "distributed/connection_management.h" +#include "distributed/error_codes.h" +#include "distributed/errormessage.h" +#include "distributed/log_utils.h" +#include "distributed/worker_log_messages.h" +#include "utils/elog.h" + + +/* + * WorkerMinMessages reflects the value of the citus.worker_min_messages setting which + * control the minimum log level of messages from the worker that are propagated to the + * client and the log on the coordinator. + */ +int WorkerMinMessages = NOTICE; + +/* + * PreserveWorkerMessageLogLevel specifies whether to propagate messages from workers + * to the client and the log on the coordinator with their original log level. When + * false, messages are propagated using DEBUG1. + * + * This flag used to suppress redundant notices in some commands (e.g. VACUUM, DROP + * TABLE). + */ +static bool PreserveWorkerMessageLogLevel = false; + +/* + * WorkerErrorIndication can contain a warning that arrives to use from one session, but occurred + * because another session in the same distributed transaction threw an error. We store + * this warning in case we do not get an error, in which case the warning should have + * been an error (and usually indicates a bug). + */ +DeferredErrorMessage *WorkerErrorIndication = NULL; + +/* list of log level names we might see from the worker */ +static const char *LogLevelNames[] = { + "DEBUG", + "NOTICE", + "INFO", + "WARNING", + "ERROR", + "FATAL", + "PANIC", + NULL +}; + +/* postgres log level values corresponding to LogLevelNames */ +static const int LogLevels[] = { + DEBUG1, + NOTICE, + INFO, + WARNING, + ERROR, + FATAL, + PANIC +}; + + +static void DefaultCitusNoticeReceiver(void *arg, const PGresult *result); +static int LogLevelNameToLogLevel(char *levelName); +static char * TrimLogLevel(const char *message); + + +/* + * SetCitusNoticeReceiver sets the NoticeReceiver to DefaultCitusNoticeReceivere + */ +void +SetCitusNoticeReceiver(MultiConnection *connection) +{ + PQsetNoticeReceiver(connection->pgConn, DefaultCitusNoticeReceiver, + connection); +} + + +/* + * EnableWorkerMessagePropagation indicates that we want to propagate messages + * from workers to the client using the same log level. + */ +void +EnableWorkerMessagePropagation(void) +{ + PreserveWorkerMessageLogLevel = true; +} + + +/* + * DisableWorkerMessagePropagation indiciates that we want all messages from the + * workers to only be sent to the client as debug messages. + */ +void +DisableWorkerMessagePropagation(void) +{ + PreserveWorkerMessageLogLevel = false; +} + + +/* + * DefaultCitusNoticeReceiver is used to redirect worker notices + * from logfile to console. + */ +static void +DefaultCitusNoticeReceiver(void *arg, const PGresult *result) +{ + MultiConnection *connection = (MultiConnection *) arg; + char *nodeName = connection->hostname; + uint32 nodePort = connection->port; + char *message = PQresultErrorMessage(result); + char *trimmedMessage = TrimLogLevel(message); + char *levelName = PQresultErrorField(result, PG_DIAG_SEVERITY); + int logLevel = LogLevelNameToLogLevel(levelName); + int sqlState = ERRCODE_INTERNAL_ERROR; + char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); + + if (sqlStateString != NULL) + { + sqlState = MAKE_SQLSTATE(sqlStateString[0], + sqlStateString[1], + sqlStateString[2], + sqlStateString[3], + sqlStateString[4]); + } + + /* + * When read_intermediate_result cannot find a file it might mean that the + * transaction that created the file already deleted it because it aborted. + * That's an expected situation, unless there is no actual error. We + * therefore store a DeferredError and raise it if we reach the end of + * execution without errors. + */ + if (sqlState == ERRCODE_CITUS_INTERMEDIATE_RESULT_NOT_FOUND && logLevel == WARNING) + { + if (WorkerErrorIndication == NULL) + { + /* we'll at most need this for the lifetime of the transaction */ + MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); + + WorkerErrorIndication = DeferredError(sqlState, pstrdup(trimmedMessage), + NULL, NULL); + + MemoryContextSwitchTo(oldContext); + } + + /* if we get the error we're expecting, the user does not need to know */ + logLevel = DEBUG4; + } + + if (logLevel < WorkerMinMessages || WorkerMinMessages == CITUS_LOG_LEVEL_OFF) + { + /* user does not want to see message */ + return; + } + + if (!PreserveWorkerMessageLogLevel) + { + /* + * We sometimes want to suppress notices (e.g. DROP TABLE cascading), + * since the user already gets the relevant notices for the distributed + * table. In that case, we change the log level to DEBUG1. + */ + logLevel = DEBUG1; + } + + ereport(logLevel, + (errcode(sqlState), + errmsg("%s", ApplyLogRedaction(trimmedMessage)), + errdetail("from %s:%d", nodeName, nodePort))); +} + + +/* + * TrimLogLevel returns a copy of the string with the leading log level + * and spaces removed such as + * From: + * INFO: "normal2_102070": scanned 0 of 0 pages... + * To: + * "normal2_102070": scanned 0 of 0 pages... + */ +static char * +TrimLogLevel(const char *message) +{ + char *chompedMessage = pchomp(message); + + size_t n = 0; + while (n < strlen(chompedMessage) && chompedMessage[n] != ':') + { + n++; + } + + do { + n++; + } while (n < strlen(chompedMessage) && chompedMessage[n] == ' '); + + return chompedMessage + n; +} + + +/* + * LogLevelNameToLogLevel translates the prefix of Postgres log messages + * back to a native log level. + */ +static int +LogLevelNameToLogLevel(char *levelName) +{ + int levelIndex = 0; + + while (LogLevelNames[levelIndex] != NULL) + { + if (strcmp(levelName, LogLevelNames[levelIndex]) == 0) + { + return LogLevels[levelIndex]; + } + + levelIndex++; + } + + return DEBUG1; +} + + +/* + * ErrorIfWorkerErrorIndicationReceived throws the deferred error in + * WorkerErrorIndication, if any. + * + * A fatal warning arrives to us as a WARNING in one session, that is triggered + * by an ERROR in another session in the same distributed transaction. We therefore + * do not expect to throw it, unless there is a bug in Citus. + */ +void +ErrorIfWorkerErrorIndicationReceived(void) +{ + if (WorkerErrorIndication != NULL) + { + RaiseDeferredError(WorkerErrorIndication, ERROR); + } +} + + +/* + * ResetWorkerErrorIndication resets the fatal warning if one was received. + */ +void +ResetWorkerErrorIndication(void) +{ + WorkerErrorIndication = NULL; +} diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 479048bcf..4c8beb97c 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1591,8 +1591,6 @@ AcquireExecutorShardLocksForExecution(DistributedExecution *execution) static void FinishDistributedExecution(DistributedExecution *execution) { - UnsetCitusNoticeLevel(); - if (DistributedExecutionModifiesDatabase(execution)) { /* prevent copying shards in same transaction */ diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 947c8778f..5556b512d 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -19,6 +19,7 @@ #include "distributed/citus_custom_scan.h" #include "distributed/citus_nodefuncs.h" #include "distributed/citus_ruleutils.h" +#include "distributed/connection_management.h" #include "distributed/deparse_shard_query.h" #include "distributed/distributed_execution_locks.h" #include "distributed/insert_select_executor.h" @@ -31,6 +32,7 @@ #include "distributed/multi_router_planner.h" #include "distributed/query_stats.h" #include "distributed/subplan_execution.h" +#include "distributed/worker_log_messages.h" #include "distributed/worker_protocol.h" #include "executor/executor.h" #include "nodes/makefuncs.h" @@ -166,6 +168,12 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags) CitusScanState *scanState = (CitusScanState *) node; + /* + * Make sure we can see notices during regular queries, which would typically + * be the result of a function that raises a notices being called. + */ + EnableWorkerMessagePropagation(); + #if PG_VERSION_NUM >= PG_VERSION_12 /* @@ -638,6 +646,15 @@ CitusEndScan(CustomScanState *node) Const *partitionKeyConst = NULL; char *partitionKeyString = NULL; + /* stop propagating notices */ + DisableWorkerMessagePropagation(); + + /* + * Check whether we received warnings that should not have been + * ignored. + */ + ErrorIfWorkerErrorIndicationReceived(); + if (workerJob != NULL) { partitionKeyConst = workerJob->partitionKeyValue; diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index d5c39e4ff..bb5d02ba1 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -20,6 +20,7 @@ #include "commands/copy.h" #include "distributed/commands/multi_copy.h" #include "distributed/connection_management.h" +#include "distributed/error_codes.h" #include "distributed/intermediate_results.h" #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" @@ -810,7 +811,7 @@ ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat, * we could try to read a non-existing file. That is most likely * to happen during development. */ - ereport(WARNING, (errcode_for_file_access(), + ereport(WARNING, (errcode(ERRCODE_CITUS_INTERMEDIATE_RESULT_NOT_FOUND), errmsg("Query could not find the intermediate result file " "\"%s\", it was mostly likely deleted due to an " "error in a parallel process within the same " diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 719e7d498..b31f29d3d 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -68,7 +68,7 @@ static List *plannerRestrictionContextList = NIL; -int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log level */ +int MultiTaskQueryLogLevel = CITUS_LOG_LEVEL_OFF; /* multi-task query log level */ static uint64 NextPlanId = 1; /* keep track of planner call stack levels */ @@ -1321,7 +1321,7 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan) if (IsMultiTaskPlan(distributedPlan)) { /* if it is not a single task executable plan, inform user according to the log level */ - if (MultiTaskQueryLogLevel != MULTI_TASK_QUERY_INFO_OFF) + if (MultiTaskQueryLogLevel != CITUS_LOG_LEVEL_OFF) { ereport(MultiTaskQueryLogLevel, (errmsg( "multi-task query about to be executed"), diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 7811b1236..98aa28042 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -67,6 +67,7 @@ #include "distributed/task_tracker.h" #include "distributed/transaction_management.h" #include "distributed/transaction_recovery.h" +#include "distributed/worker_log_messages.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "distributed/worker_shard_visibility.h" @@ -164,14 +165,20 @@ static const struct config_enum_entry shard_commit_protocol_options[] = { { NULL, 0, false } }; -static const struct config_enum_entry multi_task_query_log_level_options[] = { - { "off", MULTI_TASK_QUERY_INFO_OFF, false }, - { "debug", DEBUG2, false }, - { "log", LOG, false }, - { "notice", NOTICE, false }, - { "warning", WARNING, false }, - { "error", ERROR, false }, - { NULL, 0, false } +static const struct config_enum_entry log_level_options[] = { + { "off", CITUS_LOG_LEVEL_OFF, false }, + { "debug5", DEBUG5, false}, + { "debug4", DEBUG4, false}, + { "debug3", DEBUG3, false}, + { "debug2", DEBUG2, false}, + { "debug1", DEBUG1, false}, + { "debug", DEBUG2, true}, + { "log", LOG, false}, + { "info", INFO, true}, + { "notice", NOTICE, false}, + { "warning", WARNING, false}, + { "error", ERROR, false}, + { NULL, 0, false} }; static const struct config_enum_entry multi_shard_modify_connection_options[] = { @@ -624,6 +631,18 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomEnumVariable( + "citus.worker_min_messages", + gettext_noop("Log messages from workers only if their log level is at or above " + "the configured level"), + NULL, + &WorkerMinMessages, + NOTICE, + log_level_options, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.explain_distributed_queries", gettext_noop("Enables Explain for distributed queries."), @@ -1248,7 +1267,7 @@ RegisterCitusConfigVariables(void) gettext_noop("Sets the level of multi task query execution log messages"), NULL, &MultiTaskQueryLogLevel, - MULTI_TASK_QUERY_INFO_OFF, multi_task_query_log_level_options, + CITUS_LOG_LEVEL_OFF, log_level_options, PGC_USERSET, GUC_STANDARD, NULL, NULL, NULL); diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index b6a541807..7a211b384 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -34,6 +34,7 @@ #include "distributed/shared_connection_stats.h" #include "distributed/subplan_execution.h" #include "distributed/version_compat.h" +#include "distributed/worker_log_messages.h" #include "utils/hsearch.h" #include "utils/guc.h" #include "utils/memutils.h" @@ -263,6 +264,9 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) case XACT_EVENT_ABORT: { + /* stop propagating notices from workers, we know the query is failed */ + DisableWorkerMessagePropagation(); + /* * FIXME: Add warning for the COORD_TRANS_COMMITTED case. That * can be reached if this backend fails after the @@ -328,7 +332,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) */ SubPlanLevel = 0; UnSetDistributedTransactionId(); - UnsetCitusNoticeLevel(); break; } @@ -451,6 +454,8 @@ ResetGlobalVariables() activeSetStmts = NULL; CoordinatedTransactionUses2PC = false; TransactionModifiedNodeMetadata = false; + + ResetWorkerErrorIndication(); } @@ -511,13 +516,23 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, case SUBXACT_EVENT_ABORT_SUB: { + /* + * Stop showing message for now, will re-enable when executing + * the next statement. + */ + DisableWorkerMessagePropagation(); + + /* + * Given that we aborted, worker error indications can be ignored. + */ + ResetWorkerErrorIndication(); + if (InCoordinatedTransaction()) { CoordinatedRemoteTransactionsSavepointRollback(subId); } PopSubXact(subId); - UnsetCitusNoticeLevel(); break; } diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 5471bc3cc..fdfe18568 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -27,9 +27,6 @@ /* used for libpq commands that get an error buffer. Postgres docs recommend 256. */ #define ERROR_BUFFER_SIZE 256 -/* default notice level */ -#define DEFAULT_CITUS_NOTICE_LEVEL DEBUG1 - /* application name used for internal connections in Citus */ #define CITUS_APPLICATION_NAME "citus" @@ -247,11 +244,6 @@ extern void FinishConnectionEstablishment(MultiConnection *connection); extern void ClaimConnectionExclusively(MultiConnection *connection); extern void UnclaimConnection(MultiConnection *connection); -/* dealing with notice handler */ -extern void SetCitusNoticeProcessor(MultiConnection *connection); -extern char * TrimLogLevel(const char *message); -extern void UnsetCitusNoticeLevel(void); - /* time utilities */ extern double MillisecondsPassedSince(instr_time moment); extern long MillisecondsToTimeout(instr_time start, long msAfterStart); diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index b0ecbc2ce..578d2bfa1 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -30,7 +30,6 @@ /* values used by jobs and tasks which do not require identifiers */ #define INVALID_JOB_ID 0 #define INVALID_TASK_ID 0 -#define MULTI_TASK_QUERY_INFO_OFF 0 /* do not log multi-task queries */ #define CURSOR_OPT_FORCE_DISTRIBUTED 0x080000 diff --git a/src/include/distributed/error_codes.h b/src/include/distributed/error_codes.h new file mode 100644 index 000000000..060f78d9e --- /dev/null +++ b/src/include/distributed/error_codes.h @@ -0,0 +1,21 @@ +/*------------------------------------------------------------------------- + * + * error_codes.h + * Error codes that are specific to Citus + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CITUS_ERROR_CODES_H +#define CITUS_ERROR_CODES_H + + +#include "utils/elog.h" + + +#define ERRCODE_CITUS_INTERMEDIATE_RESULT_NOT_FOUND MAKE_SQLSTATE('C', 'I', 'I', 'N', 'F') + + +#endif /* CITUS_ERROR_CODES_H */ diff --git a/src/include/distributed/log_utils.h b/src/include/distributed/log_utils.h index f69bd27ec..51bc7d846 100644 --- a/src/include/distributed/log_utils.h +++ b/src/include/distributed/log_utils.h @@ -12,6 +12,9 @@ #include "utils/guc.h" +/* do not log */ +#define CITUS_LOG_LEVEL_OFF 0 + extern bool IsLoggableLevel(int logLevel); extern char * HashLogMessage(const char *text); diff --git a/src/include/distributed/worker_log_messages.h b/src/include/distributed/worker_log_messages.h new file mode 100644 index 000000000..ad1f2d6c3 --- /dev/null +++ b/src/include/distributed/worker_log_messages.h @@ -0,0 +1,28 @@ +/*------------------------------------------------------------------------- + * + * worker_log_messages.h + * Functions for handling log messages from the workers. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef WORKER_LOG_MESSAGES_H +#define WORKER_LOG_MESSAGES_H + + +#include "distributed/connection_management.h" + + +/* minimum log level for worker messages to be propagated */ +extern int WorkerMinMessages; + +void SetCitusNoticeReceiver(MultiConnection *connection); +void EnableWorkerMessagePropagation(void); +void DisableWorkerMessagePropagation(void); +void ErrorIfWorkerErrorIndicationReceived(void); +void ResetWorkerErrorIndication(void); + + +#endif /* WORKER_LOG_MESSAGES_H */ diff --git a/src/test/regress/expected/distributed_functions.out b/src/test/regress/expected/distributed_functions.out index 5e25f0a4f..4d2f29c36 100644 --- a/src/test/regress/expected/distributed_functions.out +++ b/src/test/regress/expected/distributed_functions.out @@ -13,6 +13,48 @@ CREATE SCHEMA function_tests AUTHORIZATION functionuser; CREATE SCHEMA function_tests2 AUTHORIZATION functionuser; SET search_path TO function_tests; SET citus.shard_count TO 4; +-- test notice +CREATE TABLE notices ( + id int primary key, + message text +); +SELECT create_distributed_table('notices', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO notices VALUES (1, 'hello world'); +CREATE FUNCTION notice(text) +RETURNS void +LANGUAGE plpgsql AS $$ +BEGIN + RAISE NOTICE '%', $1; +END; +$$; +SELECT create_distributed_function('notice(text)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT notice(message) FROM notices WHERE id = 1; +NOTICE: hello world +DETAIL: from localhost:xxxxx + notice +--------------------------------------------------------------------- + +(1 row) + +-- should not see a NOTICE if worker_min_messages is WARNING +SET citus.worker_min_messages TO WARNING; +SELECT notice(message) FROM notices WHERE id = 1; + notice +--------------------------------------------------------------------- + +(1 row) + +RESET citus.worker_min_messages; -- Create and distribute a simple function CREATE FUNCTION eq(macaddr, macaddr) RETURNS bool AS 'select $1 = $2;' diff --git a/src/test/regress/expected/intermediate_results.out b/src/test/regress/expected/intermediate_results.out index fc46ab14a..c356d022e 100644 --- a/src/test/regress/expected/intermediate_results.out +++ b/src/test/regress/expected/intermediate_results.out @@ -94,12 +94,16 @@ FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', WHERE user_id = 'jon' OR true ORDER BY x; DEBUG: Router planner cannot handle multi-shard select queries -DEBUG: Query could not find the intermediate result file "squares", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction -DETAIL: WARNING from localhost:xxxxx - x | x2 ---------------------------------------------------------------------- -(0 rows) - +ERROR: Query could not find the intermediate result file "squares", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction +SET client_min_messages TO DEBUG; +SELECT x, x2 +FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in) +WHERE user_id = 'jon' +ORDER BY x; +DEBUG: Creating router plan +DEBUG: Plan is router executable +DETAIL: distribution column value: jon +ERROR: Query could not find the intermediate result file "squares", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction RESET client_min_messages; -- try to read the file as text, will fail because of binary encoding BEGIN; diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out index ef9e5e615..a0cc25c86 100644 --- a/src/test/regress/expected/local_shard_copy.out +++ b/src/test/regress/expected/local_shard_copy.out @@ -5,17 +5,17 @@ SET citus.next_shard_id TO 1570000; SET citus.replicate_reference_tables_on_activate TO off; SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); DEBUG: schema "public" already exists, skipping -DETAIL: NOTICE from localhost:xxxxx +DETAIL: from localhost:xxxxx DEBUG: extension "plpgsql" already exists, skipping -DETAIL: NOTICE from localhost:xxxxx +DETAIL: from localhost:xxxxx DEBUG: schema "citus_mx_test_schema" already exists, skipping -DETAIL: NOTICE from localhost:xxxxx +DETAIL: from localhost:xxxxx DEBUG: schema "citus_mx_test_schema_join_1" already exists, skipping -DETAIL: NOTICE from localhost:xxxxx +DETAIL: from localhost:xxxxx DEBUG: schema "citus_mx_test_schema_join_2" already exists, skipping -DETAIL: NOTICE from localhost:xxxxx +DETAIL: from localhost:xxxxx DEBUG: schema "citus_mx_schema_for_xacts" already exists, skipping -DETAIL: NOTICE from localhost:xxxxx +DETAIL: from localhost:xxxxx master_add_node --------------------------------------------------------------------- 32 diff --git a/src/test/regress/expected/multi_function_evaluation.out b/src/test/regress/expected/multi_function_evaluation.out index 6ad76423d..deeeb2049 100644 --- a/src/test/regress/expected/multi_function_evaluation.out +++ b/src/test/regress/expected/multi_function_evaluation.out @@ -180,10 +180,46 @@ SELECT create_distributed_function('stable_squared(int)'); UPDATE example SET value = timestamp '10-10-2000 00:00' FROM (SELECT key, stable_squared(count(*)::int) y FROM example GROUP BY key) a WHERE example.key = a.key; +NOTICE: stable_fn called +DETAIL: from localhost:xxxxx +NOTICE: stable_fn called +DETAIL: from localhost:xxxxx +NOTICE: stable_fn called +DETAIL: from localhost:xxxxx +NOTICE: stable_fn called +DETAIL: from localhost:xxxxx +NOTICE: stable_fn called +DETAIL: from localhost:xxxxx +NOTICE: stable_fn called +DETAIL: from localhost:xxxxx UPDATE example SET value = timestamp '10-10-2000 00:00' FROM (SELECT key, stable_squared((count(*) OVER ())::int) y FROM example GROUP BY key) a WHERE example.key = a.key; +NOTICE: stable_fn called +DETAIL: from localhost:xxxxx +NOTICE: stable_fn called +DETAIL: from localhost:xxxxx +NOTICE: stable_fn called +DETAIL: from localhost:xxxxx +NOTICE: stable_fn called +DETAIL: from localhost:xxxxx +NOTICE: stable_fn called +DETAIL: from localhost:xxxxx +NOTICE: stable_fn called +DETAIL: from localhost:xxxxx UPDATE example SET value = timestamp '10-10-2000 00:00' FROM (SELECT key, stable_squared(grouping(key)) y FROM example GROUP BY key) a WHERE example.key = a.key; +NOTICE: stable_fn called +DETAIL: from localhost:xxxxx +NOTICE: stable_fn called +DETAIL: from localhost:xxxxx +NOTICE: stable_fn called +DETAIL: from localhost:xxxxx +NOTICE: stable_fn called +DETAIL: from localhost:xxxxx +NOTICE: stable_fn called +DETAIL: from localhost:xxxxx +NOTICE: stable_fn called +DETAIL: from localhost:xxxxx DROP SCHEMA multi_function_evaluation CASCADE; NOTICE: drop cascades to 5 other objects DETAIL: drop cascades to table example diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index 7af0de8d9..6a05bec9b 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -988,7 +988,7 @@ FROM pg_dist_shard_placement AS sp, pg_dist_shard AS s WHERE sp.shardid = s.shardid AND (s.logicalrelid = 'reference_modifying_xacts'::regclass OR - s.logicalrelid = 'hash_modifying_xacts'::regclass) + s.logicalrelid = 'hash_modifying_xacts'::regclass) GROUP BY s.logicalrelid, sp.shardstate ORDER BY s.logicalrelid, sp.shardstate; logicalrelid | shardstate | count @@ -1035,7 +1035,7 @@ FROM pg_dist_shard_placement AS sp, pg_dist_shard AS s WHERE sp.shardid = s.shardid AND (s.logicalrelid = 'reference_modifying_xacts'::regclass OR - s.logicalrelid = 'hash_modifying_xacts'::regclass) + s.logicalrelid = 'hash_modifying_xacts'::regclass) GROUP BY s.logicalrelid, sp.shardstate ORDER BY s.logicalrelid, sp.shardstate; logicalrelid | shardstate | count @@ -1200,7 +1200,7 @@ SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) FROM pg_dist_shard_placement AS sp, pg_dist_shard AS s WHERE sp.shardid = s.shardid -AND s.logicalrelid = 'reference_failure_test'::regclass +AND s.logicalrelid = 'reference_failure_test'::regclass GROUP BY s.logicalrelid, sp.shardstate ORDER BY s.logicalrelid, sp.shardstate; logicalrelid | shardstate | count diff --git a/src/test/regress/expected/multi_mx_call.out b/src/test/regress/expected/multi_mx_call.out index 94f2260a7..968324bf4 100644 --- a/src/test/regress/expected/multi_mx_call.out +++ b/src/test/regress/expected/multi_mx_call.out @@ -385,7 +385,7 @@ DETAIL: A distributed function is created. To make sure subsequent commands see \set VERBOSITY terse call multi_mx_call.mx_call_proc_raise(2); DEBUG: pushing down the procedure -DEBUG: warning +WARNING: warning ERROR: error \set VERBOSITY default -- Test that we don't propagate to non-metadata worker nodes diff --git a/src/test/regress/expected/multi_mx_function_call_delegation.out b/src/test/regress/expected/multi_mx_function_call_delegation.out index a37eb52b5..0d20d7cdb 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation.out @@ -406,7 +406,7 @@ DETAIL: A distributed function is created. To make sure subsequent commands see \set VERBOSITY terse select mx_call_func_raise(2); DEBUG: pushing down the function call -DEBUG: warning +WARNING: warning ERROR: error \set VERBOSITY default -- Don't push-down when doing INSERT INTO ... SELECT func(); diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 2a7eb0a28..9bdbb1c19 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -461,6 +461,7 @@ if ($followercluster) # shard_count to 4 to speed up the tests. if($isolationtester) { + push(@pgOptions, '-c', "citus.worker_min_messages='warning'"); push(@pgOptions, '-c', "citus.log_distributed_deadlock_detection=on"); push(@pgOptions, '-c', "citus.distributed_deadlock_detection_factor=-1"); push(@pgOptions, '-c', "citus.shard_count=4"); diff --git a/src/test/regress/sql/distributed_functions.sql b/src/test/regress/sql/distributed_functions.sql index b4d479260..f8d7ac08d 100644 --- a/src/test/regress/sql/distributed_functions.sql +++ b/src/test/regress/sql/distributed_functions.sql @@ -9,6 +9,29 @@ CREATE SCHEMA function_tests2 AUTHORIZATION functionuser; SET search_path TO function_tests; SET citus.shard_count TO 4; +-- test notice +CREATE TABLE notices ( + id int primary key, + message text +); +SELECT create_distributed_table('notices', 'id'); +INSERT INTO notices VALUES (1, 'hello world'); + +CREATE FUNCTION notice(text) +RETURNS void +LANGUAGE plpgsql AS $$ +BEGIN + RAISE NOTICE '%', $1; +END; +$$; +SELECT create_distributed_function('notice(text)'); +SELECT notice(message) FROM notices WHERE id = 1; + +-- should not see a NOTICE if worker_min_messages is WARNING +SET citus.worker_min_messages TO WARNING; +SELECT notice(message) FROM notices WHERE id = 1; +RESET citus.worker_min_messages; + -- Create and distribute a simple function CREATE FUNCTION eq(macaddr, macaddr) RETURNS bool AS 'select $1 = $2;' diff --git a/src/test/regress/sql/intermediate_results.sql b/src/test/regress/sql/intermediate_results.sql index 156cd09ac..b260f5ea2 100644 --- a/src/test/regress/sql/intermediate_results.sql +++ b/src/test/regress/sql/intermediate_results.sql @@ -51,6 +51,12 @@ FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', WHERE user_id = 'jon' OR true ORDER BY x; +SET client_min_messages TO DEBUG; +SELECT x, x2 +FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in) +WHERE user_id = 'jon' +ORDER BY x; + RESET client_min_messages; -- try to read the file as text, will fail because of binary encoding BEGIN;