Send explain queries with savepoints

With this commit, we started to send explain queries within a savepoint. After
running explain query, we rollback to savepoint. This saves us from side effects
of EXPLAIN ANALYZE on DML queries.
pull/1361/head
Metin Doslu 2017-04-24 17:41:48 +03:00
parent 905ca98a9b
commit b6659bec22
6 changed files with 133 additions and 4 deletions

View File

@ -14,6 +14,7 @@
#include "distributed/connection_management.h"
#include "distributed/remote_commands.h"
#include "lib/stringinfo.h"
#include "miscadmin.h"
#include "storage/latch.h"
@ -325,6 +326,38 @@ SendRemoteCommand(MultiConnection *connection, const char *command)
}
/*
* ReadFirstColumnAsText reads the first column of result tuples from the given
* PGresult struct and returns them in a StringInfo list.
*/
List *
ReadFirstColumnAsText(PGresult *queryResult)
{
List *resultRowList = NIL;
const int columnIndex = 0;
int64 rowIndex = 0;
int64 rowCount = 0;
ExecStatusType status = PQresultStatus(queryResult);
if (status == PGRES_TUPLES_OK)
{
rowCount = PQntuples(queryResult);
}
for (rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
char *rowValue = PQgetvalue(queryResult, rowIndex, columnIndex);
StringInfo rowValueString = makeStringInfo();
appendStringInfoString(rowValueString, rowValue);
resultRowList = lappend(resultRowList, rowValueString);
}
return resultRowList;
}
/*
* GetRemoteCommandResult is a wrapper around PQgetResult() that handles interrupts.
*

View File

@ -8,6 +8,7 @@
*/
#include "postgres.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "access/xact.h"
@ -20,6 +21,7 @@
#include "commands/tablecmds.h"
#include "optimizer/cost.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/connection_management.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_explain.h"
@ -29,6 +31,8 @@
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/remote_commands.h"
#include "distributed/placement_connection.h"
#include "distributed/worker_protocol.h"
#include "lib/stringinfo.h"
#include "nodes/plannodes.h"
@ -288,15 +292,60 @@ RemoteExplain(Task *task, ExplainState *es)
remotePlan = (RemoteExplainPlan *) palloc0(sizeof(RemoteExplainPlan));
explainQuery = BuildRemoteExplainQuery(task->queryString, es);
/*
* Use a coordinated transaction to ensure that we open a transaction block
* such that we can set a savepoint.
*/
BeginOrContinueCoordinatedTransaction();
for (placementIndex = 0; placementIndex < placementCount; placementIndex++)
{
ShardPlacement *taskPlacement = list_nth(taskPlacementList, placementIndex);
char *nodeName = taskPlacement->nodeName;
uint32 nodePort = taskPlacement->nodePort;
MultiConnection *connection = NULL;
PGresult *queryResult = NULL;
int connectionFlags = 0;
int executeResult = 0;
remotePlan->placementIndex = placementIndex;
remotePlan->explainOutputList = ExecuteRemoteQuery(nodeName, nodePort,
NULL, explainQuery);
connection = GetPlacementConnection(connectionFlags, taskPlacement, NULL);
/* try other placements if we fail to connect this one */
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
continue;
}
RemoteTransactionBeginIfNecessary(connection);
/*
* Start a savepoint for the explain query. After running the explain
* query, we will rollback to this savepoint. This saves us from side
* effects of EXPLAIN ANALYZE on DML queries.
*/
ExecuteCriticalRemoteCommand(connection, "SAVEPOINT citus_explain_savepoint");
/* run explain query */
executeResult = ExecuteOptionalRemoteCommand(connection, explainQuery->data,
&queryResult);
if (executeResult != 0)
{
PQclear(queryResult);
ForgetResults(connection);
continue;
}
/* read explain query results */
remotePlan->explainOutputList = ReadFirstColumnAsText(queryResult);
PQclear(queryResult);
ForgetResults(connection);
/* rollback to the savepoint */
ExecuteCriticalRemoteCommand(connection,
"ROLLBACK TO SAVEPOINT citus_explain_savepoint");
if (remotePlan->explainOutputList != NIL)
{
break;

View File

@ -44,6 +44,7 @@ extern int SendRemoteCommand(MultiConnection *connection, const char *command);
extern int SendRemoteCommandParams(MultiConnection *connection, const char *command,
int parameterCount, const Oid *parameterTypes,
const char *const *parameterValues);
extern List * ReadFirstColumnAsText(struct pg_result *queryResult);
extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection,
bool raiseInterrupts);

View File

@ -669,3 +669,20 @@ Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0)
Node: host=localhost port=57637 dbname=regression
-> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem (cost=0.28..11.83 rows=3 width=5)
Index Cond: (l_orderkey = 5)
-- test explain in a transaction with alter table to test we use right connections
BEGIN;
CREATE TABLE explain_table(id int);
SELECT create_distributed_table('explain_table', 'id');
ALTER TABLE explain_table ADD COLUMN value int;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
EXPLAIN (COSTS FALSE) SELECT value FROM explain_table WHERE id = 1;
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Seq Scan on explain_table_570001 explain_table
Filter: (id = 1)
ROLLBACK;

View File

@ -640,3 +640,20 @@ Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0)
Node: host=localhost port=57637 dbname=regression
-> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem (cost=0.28..11.83 rows=3 width=5)
Index Cond: (l_orderkey = 5)
-- test explain in a transaction with alter table to test we use right connections
BEGIN;
CREATE TABLE explain_table(id int);
SELECT create_distributed_table('explain_table', 'id');
ALTER TABLE explain_table ADD COLUMN value int;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
EXPLAIN (COSTS FALSE) SELECT value FROM explain_table WHERE id = 1;
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Seq Scan on explain_table_570001 explain_table
Filter: (id = 1)
ROLLBACK;

View File

@ -235,3 +235,15 @@ EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query;
-- at least make sure to fail without crashing
PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1;
EXPLAIN EXECUTE router_executor_query_param(5);
-- test explain in a transaction with alter table to test we use right connections
BEGIN;
CREATE TABLE explain_table(id int);
SELECT create_distributed_table('explain_table', 'id');
ALTER TABLE explain_table ADD COLUMN value int;
EXPLAIN (COSTS FALSE) SELECT value FROM explain_table WHERE id = 1;
ROLLBACK;