diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 3c9bf07f5..050b8eee2 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -14,6 +14,7 @@ #include "distributed/connection_management.h" #include "distributed/remote_commands.h" +#include "lib/stringinfo.h" #include "miscadmin.h" #include "storage/latch.h" @@ -325,6 +326,38 @@ SendRemoteCommand(MultiConnection *connection, const char *command) } +/* + * ReadFirstColumnAsText reads the first column of result tuples from the given + * PGresult struct and returns them in a StringInfo list. + */ +List * +ReadFirstColumnAsText(PGresult *queryResult) +{ + List *resultRowList = NIL; + const int columnIndex = 0; + int64 rowIndex = 0; + int64 rowCount = 0; + + ExecStatusType status = PQresultStatus(queryResult); + if (status == PGRES_TUPLES_OK) + { + rowCount = PQntuples(queryResult); + } + + for (rowIndex = 0; rowIndex < rowCount; rowIndex++) + { + char *rowValue = PQgetvalue(queryResult, rowIndex, columnIndex); + + StringInfo rowValueString = makeStringInfo(); + appendStringInfoString(rowValueString, rowValue); + + resultRowList = lappend(resultRowList, rowValueString); + } + + return resultRowList; +} + + /* * GetRemoteCommandResult is a wrapper around PQgetResult() that handles interrupts. * diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 25fa2ab9c..52d2e10dc 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -8,6 +8,7 @@ */ #include "postgres.h" +#include "libpq-fe.h" #include "miscadmin.h" #include "access/xact.h" @@ -20,6 +21,7 @@ #include "commands/tablecmds.h" #include "optimizer/cost.h" #include "distributed/citus_nodefuncs.h" +#include "distributed/connection_management.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_executor.h" #include "distributed/multi_explain.h" @@ -29,6 +31,8 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_planner.h" #include "distributed/multi_server_executor.h" +#include "distributed/remote_commands.h" +#include "distributed/placement_connection.h" #include "distributed/worker_protocol.h" #include "lib/stringinfo.h" #include "nodes/plannodes.h" @@ -288,15 +292,60 @@ RemoteExplain(Task *task, ExplainState *es) remotePlan = (RemoteExplainPlan *) palloc0(sizeof(RemoteExplainPlan)); explainQuery = BuildRemoteExplainQuery(task->queryString, es); + /* + * Use a coordinated transaction to ensure that we open a transaction block + * such that we can set a savepoint. + */ + BeginOrContinueCoordinatedTransaction(); + for (placementIndex = 0; placementIndex < placementCount; placementIndex++) { ShardPlacement *taskPlacement = list_nth(taskPlacementList, placementIndex); - char *nodeName = taskPlacement->nodeName; - uint32 nodePort = taskPlacement->nodePort; + MultiConnection *connection = NULL; + PGresult *queryResult = NULL; + int connectionFlags = 0; + int executeResult = 0; remotePlan->placementIndex = placementIndex; - remotePlan->explainOutputList = ExecuteRemoteQuery(nodeName, nodePort, - NULL, explainQuery); + + connection = GetPlacementConnection(connectionFlags, taskPlacement, NULL); + + /* try other placements if we fail to connect this one */ + if (PQstatus(connection->pgConn) != CONNECTION_OK) + { + continue; + } + + RemoteTransactionBeginIfNecessary(connection); + + /* + * Start a savepoint for the explain query. After running the explain + * query, we will rollback to this savepoint. This saves us from side + * effects of EXPLAIN ANALYZE on DML queries. + */ + ExecuteCriticalRemoteCommand(connection, "SAVEPOINT citus_explain_savepoint"); + + /* run explain query */ + executeResult = ExecuteOptionalRemoteCommand(connection, explainQuery->data, + &queryResult); + if (executeResult != 0) + { + PQclear(queryResult); + ForgetResults(connection); + + continue; + } + + /* read explain query results */ + remotePlan->explainOutputList = ReadFirstColumnAsText(queryResult); + + PQclear(queryResult); + ForgetResults(connection); + + /* rollback to the savepoint */ + ExecuteCriticalRemoteCommand(connection, + "ROLLBACK TO SAVEPOINT citus_explain_savepoint"); + if (remotePlan->explainOutputList != NIL) { break; diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 178dc2f7a..3799d9d97 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -44,6 +44,7 @@ extern int SendRemoteCommand(MultiConnection *connection, const char *command); extern int SendRemoteCommandParams(MultiConnection *connection, const char *command, int parameterCount, const Oid *parameterTypes, const char *const *parameterValues); +extern List * ReadFirstColumnAsText(struct pg_result *queryResult); extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts); diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index e3ae6bfd3..2d8193b6a 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -669,3 +669,20 @@ Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0) Node: host=localhost port=57637 dbname=regression -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem (cost=0.28..11.83 rows=3 width=5) Index Cond: (l_orderkey = 5) +-- test explain in a transaction with alter table to test we use right connections +BEGIN; +CREATE TABLE explain_table(id int); +SELECT create_distributed_table('explain_table', 'id'); + +ALTER TABLE explain_table ADD COLUMN value int; +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' +EXPLAIN (COSTS FALSE) SELECT value FROM explain_table WHERE id = 1; +Custom Scan (Citus Router) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Seq Scan on explain_table_570001 explain_table + Filter: (id = 1) +ROLLBACK; diff --git a/src/test/regress/expected/multi_explain_0.out b/src/test/regress/expected/multi_explain_0.out index c7f7166a3..10798f526 100644 --- a/src/test/regress/expected/multi_explain_0.out +++ b/src/test/regress/expected/multi_explain_0.out @@ -640,3 +640,20 @@ Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0) Node: host=localhost port=57637 dbname=regression -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem (cost=0.28..11.83 rows=3 width=5) Index Cond: (l_orderkey = 5) +-- test explain in a transaction with alter table to test we use right connections +BEGIN; +CREATE TABLE explain_table(id int); +SELECT create_distributed_table('explain_table', 'id'); + +ALTER TABLE explain_table ADD COLUMN value int; +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' +EXPLAIN (COSTS FALSE) SELECT value FROM explain_table WHERE id = 1; +Custom Scan (Citus Router) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Seq Scan on explain_table_570001 explain_table + Filter: (id = 1) +ROLLBACK; diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql index 1d6e94db2..e744b1f31 100644 --- a/src/test/regress/sql/multi_explain.sql +++ b/src/test/regress/sql/multi_explain.sql @@ -235,3 +235,15 @@ EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query; -- at least make sure to fail without crashing PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1; EXPLAIN EXECUTE router_executor_query_param(5); + +-- test explain in a transaction with alter table to test we use right connections +BEGIN; + +CREATE TABLE explain_table(id int); +SELECT create_distributed_table('explain_table', 'id'); + +ALTER TABLE explain_table ADD COLUMN value int; + +EXPLAIN (COSTS FALSE) SELECT value FROM explain_table WHERE id = 1; + +ROLLBACK;