Allow distributed execution from run_command_on_* functions

release-11.0-onder-23-may
Marco Slot 2022-05-19 12:29:41 +02:00
parent 8229d4b7ee
commit 73fd4f7ded
13 changed files with 332 additions and 140 deletions

View File

@ -15,6 +15,7 @@
#include "distributed/backend_data.h" #include "distributed/backend_data.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/remote_commands.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "signal.h" #include "signal.h"
@ -111,18 +112,39 @@ CitusSignalBackend(uint64 globalPID, uint64 timeout, int sig)
#endif #endif
} }
StringInfo queryResult = makeStringInfo(); int connectionFlags = 0;
MultiConnection *connection = GetNodeConnection(connectionFlags,
workerNode->workerName,
workerNode->workerPort);
bool reportResultError = true; if (!SendRemoteCommand(connection, cancelQuery->data))
bool success = ExecuteRemoteQueryOrCommand(workerNode->workerName,
workerNode->workerPort, cancelQuery->data,
queryResult, reportResultError);
if (success && queryResult && strcmp(queryResult->data, "f") == 0)
{ {
/* if we cannot connect, we warn and report false */
ReportConnectionError(connection, WARNING);
return false;
}
bool raiseInterrupts = true;
PGresult *queryResult = GetRemoteCommandResult(connection, raiseInterrupts);
/* if remote node throws an error, we also throw an error */
if (!IsResponseOK(queryResult))
{
ReportResultError(connection, queryResult, ERROR);
}
StringInfo queryResultString = makeStringInfo();
bool success = EvaluateSingleQueryResult(connection, queryResult, queryResultString);
if (success && strcmp(queryResultString->data, "f") == 0)
{
/* worker node returned "f" */
success = false; success = false;
} }
PQclear(queryResult);
bool raiseErrors = false;
ClearResults(connection, raiseErrors);
return success; return success;
} }

View File

@ -1477,6 +1477,18 @@ IsRebalancerInternalBackend(void)
} }
/*
* IsCitusRunCommandBackend returns true if we are in a backend that one of
* the run_command_on_* functions initiated.
*/
bool
IsCitusRunCommandBackend(void)
{
return application_name &&
strcmp(application_name, CITUS_RUN_COMMAND_APPLICATION_NAME) == 0;
}
/* /*
* IsCitusInitiatedRemoteBackend returns true if we are in a backend that citus * IsCitusInitiatedRemoteBackend returns true if we are in a backend that citus
* initiated via remote connection. * initiated via remote connection.

View File

@ -1115,3 +1115,92 @@ SendCancelationRequest(MultiConnection *connection)
return cancelSent; return cancelSent;
} }
/*
* EvaluateSingleQueryResult gets the query result from connection and returns
* true if the query is executed successfully, false otherwise. A query result
* or an error message is returned in queryResultString. The function requires
* that the query returns a single column/single row result. It returns an
* error otherwise.
*/
bool
EvaluateSingleQueryResult(MultiConnection *connection, PGresult *queryResult,
StringInfo queryResultString)
{
bool success = false;
ExecStatusType resultStatus = PQresultStatus(queryResult);
if (resultStatus == PGRES_COMMAND_OK)
{
char *commandStatus = PQcmdStatus(queryResult);
appendStringInfo(queryResultString, "%s", commandStatus);
success = true;
}
else if (resultStatus == PGRES_TUPLES_OK)
{
int ntuples = PQntuples(queryResult);
int nfields = PQnfields(queryResult);
/* error if query returns more than 1 rows, or more than 1 fields */
if (nfields != 1)
{
appendStringInfo(queryResultString,
"expected a single column in query target");
}
else if (ntuples > 1)
{
appendStringInfo(queryResultString,
"expected a single row in query result");
}
else
{
int row = 0;
int column = 0;
if (!PQgetisnull(queryResult, row, column))
{
char *queryResultValue = PQgetvalue(queryResult, row, column);
appendStringInfo(queryResultString, "%s", queryResultValue);
}
success = true;
}
}
else
{
StoreErrorMessage(connection, queryResultString);
}
return success;
}
/*
* StoreErrorMessage gets the error message from connection and stores it
* in queryResultString. It should be called only when error is present
* otherwise it would return a default error message.
*/
void
StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString)
{
char *errorMessage = PQerrorMessage(connection->pgConn);
if (errorMessage != NULL)
{
/* copy the error message to a writable memory */
errorMessage = pnstrdup(errorMessage, strlen(errorMessage));
char *firstNewlineIndex = strchr(errorMessage, '\n');
/* trim the error message at the line break */
if (firstNewlineIndex != NULL)
{
*firstNewlineIndex = '\0';
}
}
else
{
/* put a default error message if no error message is reported */
errorMessage = "An error occurred while running the query";
}
appendStringInfo(queryResultString, "%s", errorMessage);
}

View File

@ -29,6 +29,9 @@
#include "utils/builtins.h" #include "utils/builtins.h"
#define SET_APPLICATION_NAME_QUERY \
"SET application_name TO '" CITUS_RUN_COMMAND_APPLICATION_NAME "'"
PG_FUNCTION_INFO_V1(master_run_on_worker); PG_FUNCTION_INFO_V1(master_run_on_worker);
static int ParseCommandParameters(FunctionCallInfo fcinfo, StringInfo **nodeNameArray, static int ParseCommandParameters(FunctionCallInfo fcinfo, StringInfo **nodeNameArray,
@ -42,15 +45,15 @@ static void ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray,
int commandCount); int commandCount);
static bool GetConnectionStatusAndResult(MultiConnection *connection, bool *resultStatus, static bool GetConnectionStatusAndResult(MultiConnection *connection, bool *resultStatus,
StringInfo queryResultString); StringInfo queryResultString);
static bool EvaluateQueryResult(MultiConnection *connection, PGresult *queryResult,
StringInfo queryResultString);
static void StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString);
static void ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray, static void ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray,
int *nodePortArray, int *nodePortArray,
StringInfo *commandStringArray, StringInfo *commandStringArray,
bool *statusArray, bool *statusArray,
StringInfo *resultStringArray, StringInfo *resultStringArray,
int commandCount); int commandCount);
static bool ExecuteOptionalSingleResultCommand(MultiConnection *connection,
char *queryString, StringInfo
queryResultString);
static Tuplestorestate * CreateTupleStore(TupleDesc tupleDescriptor, static Tuplestorestate * CreateTupleStore(TupleDesc tupleDescriptor,
StringInfo *nodeNameArray, int *nodePortArray, StringInfo *nodeNameArray, int *nodePortArray,
bool *statusArray, bool *statusArray,
@ -239,18 +242,66 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
FinishConnectionEstablishment(connection); FinishConnectionEstablishment(connection);
/* check whether connection attempt was successful */
if (PQstatus(connection->pgConn) != CONNECTION_OK) if (PQstatus(connection->pgConn) != CONNECTION_OK)
{ {
appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName,
(int) nodePort); nodePort);
statusArray[commandIndex] = false; statusArray[commandIndex] = false;
CloseConnection(connection);
connectionArray[commandIndex] = NULL; connectionArray[commandIndex] = NULL;
finishedCount++; finishedCount++;
continue;
} }
else
/* set the application_name to avoid nested execution checks */
int querySent = SendRemoteCommand(connection, SET_APPLICATION_NAME_QUERY);
if (querySent == 0)
{ {
statusArray[commandIndex] = true; StoreErrorMessage(connection, queryResultString);
statusArray[commandIndex] = false;
CloseConnection(connection);
connectionArray[commandIndex] = NULL;
finishedCount++;
continue;
} }
statusArray[commandIndex] = true;
}
/* send queries at once */
for (int commandIndex = 0; commandIndex < commandCount; commandIndex++)
{
MultiConnection *connection = connectionArray[commandIndex];
if (connection == NULL)
{
continue;
}
bool raiseInterrupts = true;
PGresult *queryResult = GetRemoteCommandResult(connection, raiseInterrupts);
/* write the result value or error message to queryResultString */
StringInfo queryResultString = resultStringArray[commandIndex];
bool success = EvaluateSingleQueryResult(connection, queryResult,
queryResultString);
if (!success)
{
statusArray[commandIndex] = false;
CloseConnection(connection);
connectionArray[commandIndex] = NULL;
finishedCount++;
continue;
}
/* clear results for the next command */
PQclear(queryResult);
bool raiseErrors = false;
ClearResults(connection, raiseErrors);
/* we only care about the SET application_name result on failure */
resetStringInfo(queryResultString);
} }
/* send queries at once */ /* send queries at once */
@ -357,7 +408,7 @@ GetConnectionStatusAndResult(MultiConnection *connection, bool *resultStatus,
/* query result is available at this point */ /* query result is available at this point */
PGresult *queryResult = PQgetResult(connection->pgConn); PGresult *queryResult = PQgetResult(connection->pgConn);
bool success = EvaluateQueryResult(connection, queryResult, queryResultString); bool success = EvaluateSingleQueryResult(connection, queryResult, queryResultString);
PQclear(queryResult); PQclear(queryResult);
*resultStatus = success; *resultStatus = success;
@ -366,95 +417,6 @@ GetConnectionStatusAndResult(MultiConnection *connection, bool *resultStatus,
} }
/*
* EvaluateQueryResult gets the query result from connection and returns
* true if the query is executed successfully, false otherwise. A query result
* or an error message is returned in queryResultString. The function requires
* that the query returns a single column/single row result. It returns an
* error otherwise.
*/
static bool
EvaluateQueryResult(MultiConnection *connection, PGresult *queryResult,
StringInfo queryResultString)
{
bool success = false;
ExecStatusType resultStatus = PQresultStatus(queryResult);
if (resultStatus == PGRES_COMMAND_OK)
{
char *commandStatus = PQcmdStatus(queryResult);
appendStringInfo(queryResultString, "%s", commandStatus);
success = true;
}
else if (resultStatus == PGRES_TUPLES_OK)
{
int ntuples = PQntuples(queryResult);
int nfields = PQnfields(queryResult);
/* error if query returns more than 1 rows, or more than 1 fields */
if (nfields != 1)
{
appendStringInfo(queryResultString,
"expected a single column in query target");
}
else if (ntuples > 1)
{
appendStringInfo(queryResultString,
"expected a single row in query result");
}
else
{
int row = 0;
int column = 0;
if (!PQgetisnull(queryResult, row, column))
{
char *queryResultValue = PQgetvalue(queryResult, row, column);
appendStringInfo(queryResultString, "%s", queryResultValue);
}
success = true;
}
}
else
{
StoreErrorMessage(connection, queryResultString);
}
return success;
}
/*
* StoreErrorMessage gets the error message from connection and stores it
* in queryResultString. It should be called only when error is present
* otherwise it would return a default error message.
*/
static void
StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString)
{
char *errorMessage = PQerrorMessage(connection->pgConn);
if (errorMessage != NULL)
{
/* copy the error message to a writable memory */
errorMessage = pnstrdup(errorMessage, strlen(errorMessage));
char *firstNewlineIndex = strchr(errorMessage, '\n');
/* trim the error message at the line break */
if (firstNewlineIndex != NULL)
{
*firstNewlineIndex = '\0';
}
}
else
{
/* put a default error message if no error message is reported */
errorMessage = "An error occurred while running the query";
}
appendStringInfo(queryResultString, "%s", errorMessage);
}
/* /*
* ExecuteCommandsAndStoreResults connects to each node specified in * ExecuteCommandsAndStoreResults connects to each node specified in
* nodeNameArray and nodePortArray, and executes command in commandStringArray * nodeNameArray and nodePortArray, and executes command in commandStringArray
@ -469,63 +431,76 @@ ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray, int *nodePortArray,
{ {
for (int commandIndex = 0; commandIndex < commandCount; commandIndex++) for (int commandIndex = 0; commandIndex < commandCount; commandIndex++)
{ {
CHECK_FOR_INTERRUPTS();
char *nodeName = nodeNameArray[commandIndex]->data; char *nodeName = nodeNameArray[commandIndex]->data;
int32 nodePort = nodePortArray[commandIndex]; int32 nodePort = nodePortArray[commandIndex];
char *queryString = commandStringArray[commandIndex]->data; char *queryString = commandStringArray[commandIndex]->data;
StringInfo queryResultString = resultStringArray[commandIndex]; StringInfo queryResultString = resultStringArray[commandIndex];
bool reportResultError = false;
bool success = ExecuteRemoteQueryOrCommand(nodeName, nodePort, queryString, int connectionFlags = FORCE_NEW_CONNECTION;
queryResultString, reportResultError); MultiConnection *connection =
GetNodeConnection(connectionFlags, nodeName, nodePort);
/* set the application_name to avoid nested execution checks */
bool success = ExecuteOptionalSingleResultCommand(connection,
SET_APPLICATION_NAME_QUERY,
queryResultString);
if (!success)
{
statusArray[commandIndex] = false;
CloseConnection(connection);
continue;
}
/* we only care about the SET application_name result on failure */
resetStringInfo(queryResultString);
/* send the actual query string */
success = ExecuteOptionalSingleResultCommand(connection, queryString,
queryResultString);
statusArray[commandIndex] = success; statusArray[commandIndex] = success;
CloseConnection(connection);
CHECK_FOR_INTERRUPTS();
} }
} }
/* /*
* ExecuteRemoteQueryOrCommand executes a query at specified remote node using * ExecuteOptionalSingleResultCommand executes a query at specified remote node using
* the calling user's credentials. The function returns the query status * the calling user's credentials. The function returns the query status
* (success/failure), and query result. The query is expected to return a single * (success/failure), and query result. The query is expected to return a single
* target containing zero or one rows. * target containing zero or one rows.
*/ */
bool static bool
ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString, ExecuteOptionalSingleResultCommand(MultiConnection *connection, char *queryString,
StringInfo queryResultString, bool reportResultError) StringInfo queryResultString)
{ {
int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *connection =
GetNodeConnection(connectionFlags, nodeName, nodePort);
bool raiseInterrupts = true;
if (PQstatus(connection->pgConn) != CONNECTION_OK) if (PQstatus(connection->pgConn) != CONNECTION_OK)
{ {
appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, appendStringInfo(queryResultString, "failed to connect to %s:%d",
(int) nodePort); connection->hostname, connection->port);
return false; return false;
} }
if (!SendRemoteCommand(connection, queryString)) if (!SendRemoteCommand(connection, queryString))
{ {
appendStringInfo(queryResultString, "failed to send query to %s:%d", nodeName, appendStringInfo(queryResultString, "failed to send query to %s:%d",
(int) nodePort); connection->hostname, connection->port);
return false; return false;
} }
bool raiseInterrupts = true;
PGresult *queryResult = GetRemoteCommandResult(connection, raiseInterrupts); PGresult *queryResult = GetRemoteCommandResult(connection, raiseInterrupts);
bool success = EvaluateQueryResult(connection, queryResult, queryResultString);
if (!success && reportResultError) /* write the result value or error message to queryResultString */
{ bool success = EvaluateSingleQueryResult(connection, queryResult, queryResultString);
ReportResultError(connection, queryResult, ERROR);
}
/* clear result and close the connection */
PQclear(queryResult); PQclear(queryResult);
/* close the connection */ bool raiseErrors = false;
CloseConnection(connection); ClearResults(connection, raiseErrors);
return success; return success;
} }

View File

@ -358,7 +358,8 @@ ShouldHideShardsInternal(void)
return false; return false;
} }
if (IsCitusInternalBackend() || IsRebalancerInternalBackend()) if (IsCitusInternalBackend() || IsRebalancerInternalBackend() ||
IsCitusRunCommandBackend())
{ {
/* we never hide shards from Citus */ /* we never hide shards from Citus */
return false; return false;

View File

@ -70,10 +70,6 @@ extern int GetExternalClientBackendCount(void);
extern uint32 IncrementExternalClientBackendCounter(void); extern uint32 IncrementExternalClientBackendCounter(void);
extern void DecrementExternalClientBackendCounter(void); extern void DecrementExternalClientBackendCounter(void);
extern bool ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort,
char *queryString, StringInfo queryResultString,
bool reportResultError);
#define INVALID_CITUS_INTERNAL_BACKEND_GPID 0 #define INVALID_CITUS_INTERNAL_BACKEND_GPID 0
#define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999 #define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999

View File

@ -35,6 +35,9 @@
/* application name used for internal connections in rebalancer */ /* application name used for internal connections in rebalancer */
#define CITUS_REBALANCER_NAME "citus_rebalancer" #define CITUS_REBALANCER_NAME "citus_rebalancer"
/* application name used for connections made by run_command_on_* */
#define CITUS_RUN_COMMAND_APPLICATION_NAME "citus_run_command"
/* deal with waiteventset errors */ /* deal with waiteventset errors */
#define WAIT_EVENT_SET_INDEX_NOT_INITIALIZED -1 #define WAIT_EVENT_SET_INDEX_NOT_INITIALIZED -1
#define WAIT_EVENT_SET_INDEX_FAILED -2 #define WAIT_EVENT_SET_INDEX_FAILED -2
@ -287,6 +290,7 @@ extern void ClaimConnectionExclusively(MultiConnection *connection);
extern void UnclaimConnection(MultiConnection *connection); extern void UnclaimConnection(MultiConnection *connection);
extern bool IsCitusInternalBackend(void); extern bool IsCitusInternalBackend(void);
extern bool IsRebalancerInternalBackend(void); extern bool IsRebalancerInternalBackend(void);
extern bool IsCitusRunCommandBackend(void);
extern void MarkConnectionConnected(MultiConnection *connection); extern void MarkConnectionConnected(MultiConnection *connection);
/* waiteventset utilities */ /* waiteventset utilities */

View File

@ -66,4 +66,8 @@ extern void WaitForAllConnections(List *connectionList, bool raiseInterrupts);
extern bool SendCancelationRequest(MultiConnection *connection); extern bool SendCancelationRequest(MultiConnection *connection);
extern bool EvaluateSingleQueryResult(MultiConnection *connection, PGresult *queryResult,
StringInfo queryResultString);
extern void StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString);
#endif /* REMOTE_COMMAND_H */ #endif /* REMOTE_COMMAND_H */

View File

@ -218,6 +218,12 @@ COMMIT;
'], '],
false false
); );
WARNING: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
connection not open
CONTEXT: while executing command on localhost:xxxxx
while executing command on localhost:xxxxx
master_run_on_worker master_run_on_worker
--------------------------------------------------------------------- ---------------------------------------------------------------------
(localhost,57636,t,BEGIN) (localhost,57636,t,BEGIN)

View File

@ -91,7 +91,7 @@ step s1-update-ref-table:
update ref_table set a = a + 1; update ref_table set a = a + 1;
step s2-sleep: step s2-sleep:
SELECT pg_sleep(0.5); SELECT pg_sleep(0.5);
pg_sleep pg_sleep
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -112,9 +112,10 @@ query |state |wait_event_type|wa
(2 rows) (2 rows)
step s2-view-worker: step s2-view-worker:
SELECT query, state, wait_event_type, wait_event, usename, datname SELECT query, state, wait_event_type, wait_event, usename, datname
FROM citus_stat_activity FROM citus_stat_activity
WHERE query NOT ILIKE ALL(VALUES WHERE query NOT ILIKE ALL(VALUES
('%application_name%'),
('%pg_prepared_xacts%'), ('%pg_prepared_xacts%'),
('%COMMIT%'), ('%COMMIT%'),
('%dump_local_%'), ('%dump_local_%'),
@ -168,9 +169,9 @@ step s1-update-ref-table:
update ref_table set a = a + 1; update ref_table set a = a + 1;
step s2-active-transactions: step s2-active-transactions:
-- Admin should be able to see all transactions -- Admin should be able to see all transactions
SELECT count(*) FROM get_all_active_transactions() WHERE transaction_number != 0; SELECT count(*) FROM get_all_active_transactions() WHERE transaction_number != 0;
SELECT count(*) FROM get_global_active_transactions() WHERE transaction_number != 0; SELECT count(*) FROM get_global_active_transactions() WHERE transaction_number != 0;
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -54,6 +54,7 @@ SELECT tablename FROM pg_tables WHERE schemaname = 'run_command_on_all_nodes';
(1 row) (1 row)
\c - - - :master_port \c - - - :master_port
SET search_path TO run_command_on_all_nodes;
SELECT result FROM run_command_on_all_nodes('SELECT tablename FROM pg_tables WHERE schemaname = ''run_command_on_all_nodes'';'); SELECT result FROM run_command_on_all_nodes('SELECT tablename FROM pg_tables WHERE schemaname = ''run_command_on_all_nodes'';');
result result
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -62,6 +63,13 @@ SELECT result FROM run_command_on_all_nodes('SELECT tablename FROM pg_tables WHE
tbl tbl
(3 rows) (3 rows)
CREATE TABLE test (x int, y int);
SELECT create_distributed_table('test','x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- break a node and check messages -- break a node and check messages
BEGIN; BEGIN;
SELECT nodeid AS worker_1_nodeid FROM pg_dist_node WHERE nodeport = :worker_1_port \gset SELECT nodeid AS worker_1_nodeid FROM pg_dist_node WHERE nodeport = :worker_1_port \gset
@ -127,6 +135,24 @@ SELECT success, result FROM run_command_on_all_nodes($$select result from run_co
t | 57636 t | 57636
(3 rows) (3 rows)
SELECT success, result FROM run_command_on_all_nodes($$select result from run_command_on_coordinator('select count(*) from run_command_on_all_nodes.test')$$);
success | result
---------------------------------------------------------------------
t | 0
t | 0
t | 0
(3 rows)
\c - - - :worker_1_port
-- poor man's DDL from worker
select result from run_command_on_coordinator($$create index on run_command_on_all_nodes.test (x)$$);
result
---------------------------------------------------------------------
CREATE INDEX
(1 row)
\c - - - :master_port
-- remove coordinator from metadata to restore pre-test situation
SELECT citus_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0; SELECT citus_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0;
citus_remove_node citus_remove_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -141,5 +167,41 @@ ERROR: the coordinator is not added to the metadata
HINT: Add the node as a coordinator by using: SELECT citus_set_coordinator_host('<hostname>') HINT: Add the node as a coordinator by using: SELECT citus_set_coordinator_host('<hostname>')
CONTEXT: PL/pgSQL function run_command_on_coordinator(text,boolean) line XX at RAISE CONTEXT: PL/pgSQL function run_command_on_coordinator(text,boolean) line XX at RAISE
ROLLBACK; ROLLBACK;
-- check that we can do distributed queries from worker nodes
SELECT success, result FROM run_command_on_all_nodes($$insert into run_command_on_all_nodes.test values (1,2)$$, true);
success | result
---------------------------------------------------------------------
t | INSERT 0 1
t | INSERT 0 1
t | INSERT 0 1
(3 rows)
SELECT success, result FROM run_command_on_all_nodes($$insert into run_command_on_all_nodes.test values (1,2)$$, false);
success | result
---------------------------------------------------------------------
t | INSERT 0 1
t | INSERT 0 1
t | INSERT 0 1
(3 rows)
SELECT success, result FROM run_command_on_all_nodes($$select count(*) from run_command_on_all_nodes.test$$);
success | result
---------------------------------------------------------------------
t | 6
t | 6
t | 6
(3 rows)
-- ddl commands are only allowed from the coordinator
SELECT success, result FROM run_command_on_all_nodes($$create index on run_command_on_all_nodes.test (x)$$);
success | result
---------------------------------------------------------------------
f | ERROR: operation is not allowed on this node
f | ERROR: operation is not allowed on this node
t | CREATE INDEX
(3 rows)
DROP SCHEMA run_command_on_all_nodes CASCADE; DROP SCHEMA run_command_on_all_nodes CASCADE;
NOTICE: drop cascades to table run_command_on_all_nodes.tbl NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table run_command_on_all_nodes.tbl
drop cascades to table run_command_on_all_nodes.test

View File

@ -89,6 +89,7 @@ step "s2-view-worker"
SELECT query, state, wait_event_type, wait_event, usename, datname SELECT query, state, wait_event_type, wait_event, usename, datname
FROM citus_stat_activity FROM citus_stat_activity
WHERE query NOT ILIKE ALL(VALUES WHERE query NOT ILIKE ALL(VALUES
('%application_name%'),
('%pg_prepared_xacts%'), ('%pg_prepared_xacts%'),
('%COMMIT%'), ('%COMMIT%'),
('%dump_local_%'), ('%dump_local_%'),

View File

@ -27,8 +27,13 @@ SELECT tablename FROM pg_tables WHERE schemaname = 'run_command_on_all_nodes';
SELECT tablename FROM pg_tables WHERE schemaname = 'run_command_on_all_nodes'; SELECT tablename FROM pg_tables WHERE schemaname = 'run_command_on_all_nodes';
\c - - - :master_port \c - - - :master_port
SET search_path TO run_command_on_all_nodes;
SELECT result FROM run_command_on_all_nodes('SELECT tablename FROM pg_tables WHERE schemaname = ''run_command_on_all_nodes'';'); SELECT result FROM run_command_on_all_nodes('SELECT tablename FROM pg_tables WHERE schemaname = ''run_command_on_all_nodes'';');
CREATE TABLE test (x int, y int);
SELECT create_distributed_table('test','x');
-- break a node and check messages -- break a node and check messages
BEGIN; BEGIN;
SELECT nodeid AS worker_1_nodeid FROM pg_dist_node WHERE nodeport = :worker_1_port \gset SELECT nodeid AS worker_1_nodeid FROM pg_dist_node WHERE nodeport = :worker_1_port \gset
@ -57,6 +62,12 @@ SELECT success, result FROM run_command_on_all_nodes($$select result from run_co
-- we can use run_command_on_coordinator from any node if the coordinator is in the metadata -- we can use run_command_on_coordinator from any node if the coordinator is in the metadata
SELECT citus_set_coordinator_host('localhost'); SELECT citus_set_coordinator_host('localhost');
SELECT success, result FROM run_command_on_all_nodes($$select result from run_command_on_coordinator('select inet_server_port()')$$); SELECT success, result FROM run_command_on_all_nodes($$select result from run_command_on_coordinator('select inet_server_port()')$$);
SELECT success, result FROM run_command_on_all_nodes($$select result from run_command_on_coordinator('select count(*) from run_command_on_all_nodes.test')$$);
\c - - - :worker_1_port
-- poor man's DDL from worker
select result from run_command_on_coordinator($$create index on run_command_on_all_nodes.test (x)$$);
\c - - - :master_port
-- remove coordinator from metadata to restore pre-test situation
SELECT citus_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0; SELECT citus_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0;
-- check that we fail when pg_dist_node is empty -- check that we fail when pg_dist_node is empty
@ -65,4 +76,12 @@ DELETE FROM pg_dist_node;
SELECT success, result FROM run_command_on_coordinator('select inet_server_port()'); SELECT success, result FROM run_command_on_coordinator('select inet_server_port()');
ROLLBACK; ROLLBACK;
-- check that we can do distributed queries from worker nodes
SELECT success, result FROM run_command_on_all_nodes($$insert into run_command_on_all_nodes.test values (1,2)$$, true);
SELECT success, result FROM run_command_on_all_nodes($$insert into run_command_on_all_nodes.test values (1,2)$$, false);
SELECT success, result FROM run_command_on_all_nodes($$select count(*) from run_command_on_all_nodes.test$$);
-- ddl commands are only allowed from the coordinator
SELECT success, result FROM run_command_on_all_nodes($$create index on run_command_on_all_nodes.test (x)$$);
DROP SCHEMA run_command_on_all_nodes CASCADE; DROP SCHEMA run_command_on_all_nodes CASCADE;