Add initial helpers to make interactions with MultiConnection et al. easier.

This includes basic infrastructure for logging of commands sent to
remote/worker nodes. Note that this has no effect as of yet, since no
callers are converted to the new infrastructure.
pull/863/head
Andres Freund 2016-12-06 14:38:58 -08:00
parent 3223b3c92d
commit 3505d431cd
5 changed files with 241 additions and 32 deletions

View File

@ -0,0 +1,195 @@
/*-------------------------------------------------------------------------
*
* remote_commands.c
* Helpers to make it easier to execute command on remote nodes.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "libpq-fe.h"
#include "distributed/connection_management.h"
#include "distributed/remote_commands.h"
/* GUC, determining whether statements sent to remote nodes are logged */
bool LogRemoteCommands = false;
/* simple helpers */
/*
* IsResponseOK checks whether the result is a successful one.
*/
bool
IsResponseOK(PGresult *result)
{
ExecStatusType resultStatus = PQresultStatus(result);
if (resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK ||
resultStatus == PGRES_COMMAND_OK)
{
return true;
}
return false;
}
/*
* ForgetResults clears a connection from pending activity.
*
* XXX: In the future it might be a good idea to use use PQcancel() if results
* would require network IO.
*/
void
ForgetResults(MultiConnection *connection)
{
while (true)
{
PGresult *result = NULL;
result = PQgetResult(connection->pgConn);
if (result == NULL)
{
break;
}
if (PQresultStatus(result) == PGRES_COPY_IN)
{
PQputCopyEnd(connection->pgConn, NULL);
/* TODO: mark transaction as failed, once we can. */
}
PQclear(result);
}
}
/*
* SqlStateMatchesCategory returns true if the given sql state (which may be
* NULL if unknown) is in the given error category. Note that we use
* ERRCODE_TO_CATEGORY macro to determine error category of the sql state and
* expect the caller to use the same macro for the error category.
*/
bool
SqlStateMatchesCategory(char *sqlStateString, int category)
{
bool sqlStateMatchesCategory = false;
int sqlState = 0;
int sqlStateCategory = 0;
if (sqlStateString == NULL)
{
return false;
}
sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2],
sqlStateString[3], sqlStateString[4]);
sqlStateCategory = ERRCODE_TO_CATEGORY(sqlState);
if (sqlStateCategory == category)
{
sqlStateMatchesCategory = true;
}
return sqlStateMatchesCategory;
}
/* report errors & warnings */
/*
* Report libpq failure that's not associated with a result.
*/
void
ReportConnectionError(MultiConnection *connection, int elevel)
{
char *nodeName = connection->hostname;
int nodePort = connection->port;
ereport(elevel, (errmsg("connection error: %s:%d", nodeName, nodePort),
errdetail("%s", PQerrorMessage(connection->pgConn))));
}
/*
* ReportResultError reports libpq failure associated with a result.
*/
void
ReportResultError(MultiConnection *connection, PGresult *result, int elevel)
{
char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
char *messagePrimary = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY);
char *messageDetail = PQresultErrorField(result, PG_DIAG_MESSAGE_DETAIL);
char *messageHint = PQresultErrorField(result, PG_DIAG_MESSAGE_HINT);
char *messageContext = PQresultErrorField(result, PG_DIAG_CONTEXT);
char *nodeName = connection->hostname;
int nodePort = connection->port;
int sqlState = ERRCODE_INTERNAL_ERROR;
if (sqlStateString != NULL)
{
sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2],
sqlStateString[3], sqlStateString[4]);
}
/*
* If the PGresult did not contain a message, the connection may provide a
* suitable top level one. At worst, this is an empty string.
*/
if (messagePrimary == NULL)
{
char *lastNewlineIndex = NULL;
messagePrimary = PQerrorMessage(connection->pgConn);
lastNewlineIndex = strrchr(messagePrimary, '\n');
/* trim trailing newline, if any */
if (lastNewlineIndex != NULL)
{
*lastNewlineIndex = '\0';
}
}
ereport(elevel, (errcode(sqlState), errmsg("%s", messagePrimary),
messageDetail ? errdetail("%s", messageDetail) : 0,
messageHint ? errhint("%s", messageHint) : 0,
messageContext ? errcontext("%s", messageContext) : 0,
errcontext("while executing command on %s:%d",
nodeName, nodePort)));
}
/*
* LogRemoteCommand logs commands send to remote nodes if
* citus.log_remote_commands wants us to do so.
*/
void
LogRemoteCommand(MultiConnection *connection, const char *command)
{
if (!LogRemoteCommands)
{
return;
}
ereport(LOG, (errmsg("issuing %s", command),
errdetail("on server %s:%d", connection->hostname, connection->port)));
}
/* wrappers around libpq functions, with command logging support */
/*
* SendRemoteCommand is a tiny PQsendQuery wrapper that logs remote commands,
* and accepts a MultiConnection instead of a plain PGconn.
*/
int
SendRemoteCommand(MultiConnection *connection, const char *command)
{
LogRemoteCommand(connection, command);
return PQsendQuery(connection->pgConn, command);
}

View File

@ -277,6 +277,16 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL, GUC_NO_SHOW_ALL,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.log_remote_commands",
gettext_noop("Log queries sent to other nodes in the server log"),
NULL,
&LogRemoteCommands,
false,
PGC_USERSET,
0,
NULL, NULL, NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.explain_multi_logical_plan", "citus.explain_multi_logical_plan",
gettext_noop("Enables Explain to print out distributed logical plans."), gettext_noop("Enables Explain to print out distributed logical plans."),

View File

@ -193,37 +193,6 @@ PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey)
} }
/*
* SqlStateMatchesCategory returns true if the given sql state (which may be
* NULL if unknown) is in the given error category. Note that we use
* ERRCODE_TO_CATEGORY macro to determine error category of the sql state and
* expect the caller to use the same macro for the error category.
*/
bool
SqlStateMatchesCategory(char *sqlStateString, int category)
{
bool sqlStateMatchesCategory = false;
int sqlState = 0;
int sqlStateCategory = 0;
if (sqlStateString == NULL)
{
return false;
}
sqlState = MAKE_SQLSTATE(sqlStateString[0], sqlStateString[1], sqlStateString[2],
sqlStateString[3], sqlStateString[4]);
sqlStateCategory = ERRCODE_TO_CATEGORY(sqlState);
if (sqlStateCategory == category)
{
sqlStateMatchesCategory = true;
}
return sqlStateMatchesCategory;
}
/* /*
* WarnRemoteError retrieves error fields from a remote result and produces an * WarnRemoteError retrieves error fields from a remote result and produces an
* error report at the WARNING level after amending the error with a CONTEXT * error report at the WARNING level after amending the error with a CONTEXT

View File

@ -57,7 +57,6 @@ extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
extern void PurgeConnection(PGconn *connection); extern void PurgeConnection(PGconn *connection);
extern void BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey); extern void BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey);
extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey); extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey);
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
extern void WarnRemoteError(PGconn *connection, PGresult *result); extern void WarnRemoteError(PGconn *connection, PGresult *result);
extern void ReraiseRemoteError(PGconn *connection, PGresult *result); extern void ReraiseRemoteError(PGconn *connection, PGresult *result);
extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser); extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser);

View File

@ -0,0 +1,36 @@
/*-------------------------------------------------------------------------
*
* remote_commands.h
* Helpers to execute commands on remote nodes, over libpq.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef REMOTE_COMMAND_H
#define REMOTE_COMMAND_H
#include "distributed/connection_management.h"
/* GUC, determining whether statements sent to remote nodes are logged */
extern bool LogRemoteCommands;
/* simple helpers */
extern bool IsResponseOK(struct pg_result *result);
extern void ForgetResults(MultiConnection *connection);
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
/* report errors & warnings */
extern void ReportConnectionError(MultiConnection *connection, int elevel);
extern void ReportResultError(MultiConnection *connection, struct pg_result *result,
int elevel);
extern void LogRemoteCommand(MultiConnection *connection, const char *command);
/* wrappers around libpq functions, with command logging support */
extern int SendRemoteCommand(MultiConnection *connection, const char *command);
#endif /* REMOTE_COMMAND_H */