mirror of https://github.com/citusdata/citus.git
Merge pull request #1361 from citusdata/explain_with_savepoint
Send explain queries with savepointspull/1365/head
commit
59ecf9faa0
|
@ -14,6 +14,7 @@
|
||||||
|
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
|
#include "lib/stringinfo.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
#include "storage/latch.h"
|
#include "storage/latch.h"
|
||||||
|
|
||||||
|
@ -325,6 +326,38 @@ SendRemoteCommand(MultiConnection *connection, const char *command)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ReadFirstColumnAsText reads the first column of result tuples from the given
|
||||||
|
* PGresult struct and returns them in a StringInfo list.
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
ReadFirstColumnAsText(PGresult *queryResult)
|
||||||
|
{
|
||||||
|
List *resultRowList = NIL;
|
||||||
|
const int columnIndex = 0;
|
||||||
|
int64 rowIndex = 0;
|
||||||
|
int64 rowCount = 0;
|
||||||
|
|
||||||
|
ExecStatusType status = PQresultStatus(queryResult);
|
||||||
|
if (status == PGRES_TUPLES_OK)
|
||||||
|
{
|
||||||
|
rowCount = PQntuples(queryResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (rowIndex = 0; rowIndex < rowCount; rowIndex++)
|
||||||
|
{
|
||||||
|
char *rowValue = PQgetvalue(queryResult, rowIndex, columnIndex);
|
||||||
|
|
||||||
|
StringInfo rowValueString = makeStringInfo();
|
||||||
|
appendStringInfoString(rowValueString, rowValue);
|
||||||
|
|
||||||
|
resultRowList = lappend(resultRowList, rowValueString);
|
||||||
|
}
|
||||||
|
|
||||||
|
return resultRowList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GetRemoteCommandResult is a wrapper around PQgetResult() that handles interrupts.
|
* GetRemoteCommandResult is a wrapper around PQgetResult() that handles interrupts.
|
||||||
*
|
*
|
||||||
|
|
|
@ -8,6 +8,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
|
#include "libpq-fe.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
|
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
|
@ -20,6 +21,7 @@
|
||||||
#include "commands/tablecmds.h"
|
#include "commands/tablecmds.h"
|
||||||
#include "optimizer/cost.h"
|
#include "optimizer/cost.h"
|
||||||
#include "distributed/citus_nodefuncs.h"
|
#include "distributed/citus_nodefuncs.h"
|
||||||
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/multi_client_executor.h"
|
#include "distributed/multi_client_executor.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/multi_explain.h"
|
#include "distributed/multi_explain.h"
|
||||||
|
@ -29,6 +31,8 @@
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
#include "distributed/multi_planner.h"
|
#include "distributed/multi_planner.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
|
#include "distributed/remote_commands.h"
|
||||||
|
#include "distributed/placement_connection.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "lib/stringinfo.h"
|
#include "lib/stringinfo.h"
|
||||||
#include "nodes/plannodes.h"
|
#include "nodes/plannodes.h"
|
||||||
|
@ -288,15 +292,60 @@ RemoteExplain(Task *task, ExplainState *es)
|
||||||
remotePlan = (RemoteExplainPlan *) palloc0(sizeof(RemoteExplainPlan));
|
remotePlan = (RemoteExplainPlan *) palloc0(sizeof(RemoteExplainPlan));
|
||||||
explainQuery = BuildRemoteExplainQuery(task->queryString, es);
|
explainQuery = BuildRemoteExplainQuery(task->queryString, es);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Use a coordinated transaction to ensure that we open a transaction block
|
||||||
|
* such that we can set a savepoint.
|
||||||
|
*/
|
||||||
|
BeginOrContinueCoordinatedTransaction();
|
||||||
|
|
||||||
for (placementIndex = 0; placementIndex < placementCount; placementIndex++)
|
for (placementIndex = 0; placementIndex < placementCount; placementIndex++)
|
||||||
{
|
{
|
||||||
ShardPlacement *taskPlacement = list_nth(taskPlacementList, placementIndex);
|
ShardPlacement *taskPlacement = list_nth(taskPlacementList, placementIndex);
|
||||||
char *nodeName = taskPlacement->nodeName;
|
MultiConnection *connection = NULL;
|
||||||
uint32 nodePort = taskPlacement->nodePort;
|
PGresult *queryResult = NULL;
|
||||||
|
int connectionFlags = 0;
|
||||||
|
int executeResult = 0;
|
||||||
|
|
||||||
remotePlan->placementIndex = placementIndex;
|
remotePlan->placementIndex = placementIndex;
|
||||||
remotePlan->explainOutputList = ExecuteRemoteQuery(nodeName, nodePort,
|
|
||||||
NULL, explainQuery);
|
connection = GetPlacementConnection(connectionFlags, taskPlacement, NULL);
|
||||||
|
|
||||||
|
/* try other placements if we fail to connect this one */
|
||||||
|
if (PQstatus(connection->pgConn) != CONNECTION_OK)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
RemoteTransactionBeginIfNecessary(connection);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Start a savepoint for the explain query. After running the explain
|
||||||
|
* query, we will rollback to this savepoint. This saves us from side
|
||||||
|
* effects of EXPLAIN ANALYZE on DML queries.
|
||||||
|
*/
|
||||||
|
ExecuteCriticalRemoteCommand(connection, "SAVEPOINT citus_explain_savepoint");
|
||||||
|
|
||||||
|
/* run explain query */
|
||||||
|
executeResult = ExecuteOptionalRemoteCommand(connection, explainQuery->data,
|
||||||
|
&queryResult);
|
||||||
|
if (executeResult != 0)
|
||||||
|
{
|
||||||
|
PQclear(queryResult);
|
||||||
|
ForgetResults(connection);
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* read explain query results */
|
||||||
|
remotePlan->explainOutputList = ReadFirstColumnAsText(queryResult);
|
||||||
|
|
||||||
|
PQclear(queryResult);
|
||||||
|
ForgetResults(connection);
|
||||||
|
|
||||||
|
/* rollback to the savepoint */
|
||||||
|
ExecuteCriticalRemoteCommand(connection,
|
||||||
|
"ROLLBACK TO SAVEPOINT citus_explain_savepoint");
|
||||||
|
|
||||||
if (remotePlan->explainOutputList != NIL)
|
if (remotePlan->explainOutputList != NIL)
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -44,6 +44,7 @@ extern int SendRemoteCommand(MultiConnection *connection, const char *command);
|
||||||
extern int SendRemoteCommandParams(MultiConnection *connection, const char *command,
|
extern int SendRemoteCommandParams(MultiConnection *connection, const char *command,
|
||||||
int parameterCount, const Oid *parameterTypes,
|
int parameterCount, const Oid *parameterTypes,
|
||||||
const char *const *parameterValues);
|
const char *const *parameterValues);
|
||||||
|
extern List * ReadFirstColumnAsText(struct pg_result *queryResult);
|
||||||
extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection,
|
extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection,
|
||||||
bool raiseInterrupts);
|
bool raiseInterrupts);
|
||||||
|
|
||||||
|
|
|
@ -669,3 +669,20 @@ Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0)
|
||||||
Node: host=localhost port=57637 dbname=regression
|
Node: host=localhost port=57637 dbname=regression
|
||||||
-> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem (cost=0.28..11.83 rows=3 width=5)
|
-> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem (cost=0.28..11.83 rows=3 width=5)
|
||||||
Index Cond: (l_orderkey = 5)
|
Index Cond: (l_orderkey = 5)
|
||||||
|
-- test explain in a transaction with alter table to test we use right connections
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE explain_table(id int);
|
||||||
|
SELECT create_distributed_table('explain_table', 'id');
|
||||||
|
|
||||||
|
ALTER TABLE explain_table ADD COLUMN value int;
|
||||||
|
NOTICE: using one-phase commit for distributed DDL commands
|
||||||
|
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||||
|
EXPLAIN (COSTS FALSE) SELECT value FROM explain_table WHERE id = 1;
|
||||||
|
Custom Scan (Citus Router)
|
||||||
|
Task Count: 1
|
||||||
|
Tasks Shown: All
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=57637 dbname=regression
|
||||||
|
-> Seq Scan on explain_table_570001 explain_table
|
||||||
|
Filter: (id = 1)
|
||||||
|
ROLLBACK;
|
||||||
|
|
|
@ -640,3 +640,20 @@ Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0)
|
||||||
Node: host=localhost port=57637 dbname=regression
|
Node: host=localhost port=57637 dbname=regression
|
||||||
-> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem (cost=0.28..11.83 rows=3 width=5)
|
-> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem (cost=0.28..11.83 rows=3 width=5)
|
||||||
Index Cond: (l_orderkey = 5)
|
Index Cond: (l_orderkey = 5)
|
||||||
|
-- test explain in a transaction with alter table to test we use right connections
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE explain_table(id int);
|
||||||
|
SELECT create_distributed_table('explain_table', 'id');
|
||||||
|
|
||||||
|
ALTER TABLE explain_table ADD COLUMN value int;
|
||||||
|
NOTICE: using one-phase commit for distributed DDL commands
|
||||||
|
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||||
|
EXPLAIN (COSTS FALSE) SELECT value FROM explain_table WHERE id = 1;
|
||||||
|
Custom Scan (Citus Router)
|
||||||
|
Task Count: 1
|
||||||
|
Tasks Shown: All
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=57637 dbname=regression
|
||||||
|
-> Seq Scan on explain_table_570001 explain_table
|
||||||
|
Filter: (id = 1)
|
||||||
|
ROLLBACK;
|
||||||
|
|
|
@ -235,3 +235,15 @@ EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query;
|
||||||
-- at least make sure to fail without crashing
|
-- at least make sure to fail without crashing
|
||||||
PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1;
|
PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1;
|
||||||
EXPLAIN EXECUTE router_executor_query_param(5);
|
EXPLAIN EXECUTE router_executor_query_param(5);
|
||||||
|
|
||||||
|
-- test explain in a transaction with alter table to test we use right connections
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
CREATE TABLE explain_table(id int);
|
||||||
|
SELECT create_distributed_table('explain_table', 'id');
|
||||||
|
|
||||||
|
ALTER TABLE explain_table ADD COLUMN value int;
|
||||||
|
|
||||||
|
EXPLAIN (COSTS FALSE) SELECT value FROM explain_table WHERE id = 1;
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
|
Loading…
Reference in New Issue