diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index f2e75291a..6be2ba898 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -266,6 +266,16 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.log_remote_commands", + gettext_noop("Log queries sent to other nodes in the server log"), + NULL, + &LogRemoteCommands, + false, + PGC_USERSET, + 0, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.explain_multi_logical_plan", gettext_noop("Enables Explain to print out distributed logical plans."), diff --git a/src/backend/distributed/utils/remote_commands.c b/src/backend/distributed/utils/remote_commands.c new file mode 100644 index 000000000..cdafaa161 --- /dev/null +++ b/src/backend/distributed/utils/remote_commands.c @@ -0,0 +1,195 @@ +/*------------------------------------------------------------------------- + * + * remote_commands.c + * Helpers to make it easier to execute command on remote nodes. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "libpq-fe.h" + +#include "distributed/connection_management.h" +#include "distributed/remote_commands.h" + + +/* GUC, determining whether statements sent to remote nodes are logged */ +bool LogRemoteCommands = false; + + +/* simple helpers */ + +/* + * IsResponseOK checks whether the result is a successful one. + */ +bool +IsResponseOK(PGresult *result) +{ + ExecStatusType resultStatus = PQresultStatus(result); + + if (resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK || + resultStatus == PGRES_COMMAND_OK) + { + return true; + } + + return false; +} + + +/* + * Clear connection from current activity. + * + * FIXME: This probably should use PQcancel() if results would require network + * IO. + */ +void +ForgetResults(MultiConnection *connection) +{ + while (true) + { + PGresult *result = NULL; + result = PQgetResult(connection->conn); + if (result == NULL) + { + break; + } + if (PQresultStatus(result) == PGRES_COPY_IN) + { + PQputCopyEnd(connection->conn, NULL); + + /* FIXME: mark connection as failed? */ + } + PQclear(result); + } +} + + +/* + * SqlStateMatchesCategory returns true if the given sql state (which may be + * NULL if unknown) is in the given error category. Note that we use + * ERRCODE_TO_CATEGORY macro to determine error category of the sql state and + * expect the caller to use the same macro for the error category. + */ +bool +SqlStateMatchesCategory(char *sqlStateString, int category) +{ + bool sqlStateMatchesCategory = false; + int sqlState = 0; + int sqlStateCategory = 0; + + if (sqlStateString == NULL) + { + return false; + } + + sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2], + sqlStateString[3], sqlStateString[4]); + + sqlStateCategory = ERRCODE_TO_CATEGORY(sqlState); + if (sqlStateCategory == category) + { + sqlStateMatchesCategory = true; + } + + return sqlStateMatchesCategory; +} + + +/* report errors & warnings */ + +/* + * Report libpq failure that's not associated with a result. + */ +void +ReportConnectionError(MultiConnection *connection, int elevel) +{ + char *nodeName = connection->hostname; + int nodePort = connection->port; + + ereport(elevel, (errmsg("connection error: %s:%d", nodeName, nodePort), + errdetail("%s", PQerrorMessage(connection->conn)))); +} + + +/* + * Report libpq failure associated with a result. + */ +void +ReportResultError(MultiConnection *connection, PGresult *result, int elevel) +{ + char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); + char *messagePrimary = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY); + char *messageDetail = PQresultErrorField(result, PG_DIAG_MESSAGE_DETAIL); + char *messageHint = PQresultErrorField(result, PG_DIAG_MESSAGE_HINT); + char *messageContext = PQresultErrorField(result, PG_DIAG_CONTEXT); + + char *nodeName = connection->hostname; + int nodePort = connection->port; + int sqlState = ERRCODE_INTERNAL_ERROR; + + if (sqlStateString != NULL) + { + sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2], + sqlStateString[3], sqlStateString[4]); + } + + /* + * If the PGresult did not contain a message, the connection may provide a + * suitable top level one. At worst, this is an empty string. + */ + if (messagePrimary == NULL) + { + char *lastNewlineIndex = NULL; + + messagePrimary = PQerrorMessage(connection->conn); + lastNewlineIndex = strrchr(messagePrimary, '\n'); + + /* trim trailing newline, if any */ + if (lastNewlineIndex != NULL) + { + *lastNewlineIndex = '\0'; + } + } + + ereport(elevel, (errcode(sqlState), errmsg("%s", messagePrimary), + messageDetail ? errdetail("%s", messageDetail) : 0, + messageHint ? errhint("%s", messageHint) : 0, + messageContext ? errcontext("%s", messageContext) : 0, + errcontext("while executing command on %s:%d", + nodeName, nodePort))); +} + + +/* + * Log commands send to remote nodes if citus.log_remote_commands wants us to + * do so. + */ +void +LogRemoteCommand(MultiConnection *connection, const char *command) +{ + if (!LogRemoteCommands) + { + return; + } + + ereport(LOG, (errmsg("issuing %s", command), + errdetail("on server %s:%d", connection->hostname, connection->port))); +} + + +/* wrappers around libpq functions, with command logging support */ + +/* + * Tiny PQsendQuery wrapper that logs remote commands, and accepts a + * MultiConnection instead of a plain PGconn. + */ +int +SendRemoteCommand(MultiConnection *connection, const char *command) +{ + LogRemoteCommand(connection, command); + return PQsendQuery(connection->conn, command); +} diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h new file mode 100644 index 000000000..6ce25ccfd --- /dev/null +++ b/src/include/distributed/remote_commands.h @@ -0,0 +1,36 @@ +/*------------------------------------------------------------------------- + * + * remote_commands.h + * Helpers to execute commands on remote nodes, over libpq. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef REMOTE_COMMAND_H +#define REMOTE_COMMAND_H + +#include "distributed/connection_management.h" + + +/* GUC, determining whether statements sent to remote nodes are logged */ +extern bool LogRemoteCommands; + + +/* simple helpers */ +extern bool IsResponseOK(struct pg_result *result); +extern void ForgetResults(MultiConnection *connection); +extern bool SqlStateMatchesCategory(char *sqlStateString, int category); + +/* report errors & warnings */ +extern void ReportConnectionError(MultiConnection *connection, int elevel); +extern void ReportResultError(MultiConnection *connection, struct pg_result *result, + int elevel); +extern void LogRemoteCommand(MultiConnection *connection, const char *command); + +/* wrappers around libpq functions, with command logging support */ +extern int SendRemoteCommand(MultiConnection *connection, const char *command); + + +#endif /* REMOTE_COMMAND_H */