diff --git a/src/backend/distributed/commands/citus_global_signal.c b/src/backend/distributed/commands/citus_global_signal.c index 64bb67f0d..8f33f91b7 100644 --- a/src/backend/distributed/commands/citus_global_signal.c +++ b/src/backend/distributed/commands/citus_global_signal.c @@ -15,6 +15,7 @@ #include "distributed/backend_data.h" #include "distributed/metadata_cache.h" +#include "distributed/remote_commands.h" #include "distributed/worker_manager.h" #include "lib/stringinfo.h" #include "signal.h" @@ -111,18 +112,39 @@ CitusSignalBackend(uint64 globalPID, uint64 timeout, int sig) #endif } - StringInfo queryResult = makeStringInfo(); + int connectionFlags = 0; + MultiConnection *connection = GetNodeConnection(connectionFlags, + workerNode->workerName, + workerNode->workerPort); - bool reportResultError = true; - - bool success = ExecuteRemoteQueryOrCommand(workerNode->workerName, - workerNode->workerPort, cancelQuery->data, - queryResult, reportResultError); - - if (success && queryResult && strcmp(queryResult->data, "f") == 0) + if (!SendRemoteCommand(connection, cancelQuery->data)) { + /* if we cannot connect, we warn and report false */ + ReportConnectionError(connection, WARNING); + return false; + } + + bool raiseInterrupts = true; + PGresult *queryResult = GetRemoteCommandResult(connection, raiseInterrupts); + + /* if remote node throws an error, we also throw an error */ + if (!IsResponseOK(queryResult)) + { + ReportResultError(connection, queryResult, ERROR); + } + + StringInfo queryResultString = makeStringInfo(); + bool success = EvaluateSingleQueryResult(connection, queryResult, queryResultString); + if (success && strcmp(queryResultString->data, "f") == 0) + { + /* worker node returned "f" */ success = false; } + PQclear(queryResult); + + bool raiseErrors = false; + ClearResults(connection, raiseErrors); + return success; } diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 7b89b3e96..e0c33b9ce 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -1477,6 +1477,18 @@ IsRebalancerInternalBackend(void) } +/* + * IsCitusRunCommandBackend returns true if we are in a backend that one of + * the run_command_on_* functions initiated. + */ +bool +IsCitusRunCommandBackend(void) +{ + return application_name && + strcmp(application_name, CITUS_RUN_COMMAND_APPLICATION_NAME) == 0; +} + + /* * IsCitusInitiatedRemoteBackend returns true if we are in a backend that citus * initiated via remote connection. diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 4c1aae6bf..2445a69f3 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -1115,3 +1115,92 @@ SendCancelationRequest(MultiConnection *connection) return cancelSent; } + + +/* + * EvaluateSingleQueryResult gets the query result from connection and returns + * true if the query is executed successfully, false otherwise. A query result + * or an error message is returned in queryResultString. The function requires + * that the query returns a single column/single row result. It returns an + * error otherwise. + */ +bool +EvaluateSingleQueryResult(MultiConnection *connection, PGresult *queryResult, + StringInfo queryResultString) +{ + bool success = false; + + ExecStatusType resultStatus = PQresultStatus(queryResult); + if (resultStatus == PGRES_COMMAND_OK) + { + char *commandStatus = PQcmdStatus(queryResult); + appendStringInfo(queryResultString, "%s", commandStatus); + success = true; + } + else if (resultStatus == PGRES_TUPLES_OK) + { + int ntuples = PQntuples(queryResult); + int nfields = PQnfields(queryResult); + + /* error if query returns more than 1 rows, or more than 1 fields */ + if (nfields != 1) + { + appendStringInfo(queryResultString, + "expected a single column in query target"); + } + else if (ntuples > 1) + { + appendStringInfo(queryResultString, + "expected a single row in query result"); + } + else + { + int row = 0; + int column = 0; + if (!PQgetisnull(queryResult, row, column)) + { + char *queryResultValue = PQgetvalue(queryResult, row, column); + appendStringInfo(queryResultString, "%s", queryResultValue); + } + success = true; + } + } + else + { + StoreErrorMessage(connection, queryResultString); + } + + return success; +} + + +/* + * StoreErrorMessage gets the error message from connection and stores it + * in queryResultString. It should be called only when error is present + * otherwise it would return a default error message. + */ +void +StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString) +{ + char *errorMessage = PQerrorMessage(connection->pgConn); + if (errorMessage != NULL) + { + /* copy the error message to a writable memory */ + errorMessage = pnstrdup(errorMessage, strlen(errorMessage)); + + char *firstNewlineIndex = strchr(errorMessage, '\n'); + + /* trim the error message at the line break */ + if (firstNewlineIndex != NULL) + { + *firstNewlineIndex = '\0'; + } + } + else + { + /* put a default error message if no error message is reported */ + errorMessage = "An error occurred while running the query"; + } + + appendStringInfo(queryResultString, "%s", errorMessage); +} diff --git a/src/backend/distributed/operations/citus_tools.c b/src/backend/distributed/operations/citus_tools.c index b7905d9f8..41122655e 100644 --- a/src/backend/distributed/operations/citus_tools.c +++ b/src/backend/distributed/operations/citus_tools.c @@ -29,6 +29,9 @@ #include "utils/builtins.h" +#define SET_APPLICATION_NAME_QUERY \ + "SET application_name TO '" CITUS_RUN_COMMAND_APPLICATION_NAME "'" + PG_FUNCTION_INFO_V1(master_run_on_worker); static int ParseCommandParameters(FunctionCallInfo fcinfo, StringInfo **nodeNameArray, @@ -42,15 +45,15 @@ static void ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int commandCount); static bool GetConnectionStatusAndResult(MultiConnection *connection, bool *resultStatus, StringInfo queryResultString); -static bool EvaluateQueryResult(MultiConnection *connection, PGresult *queryResult, - StringInfo queryResultString); -static void StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString); static void ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray, int *nodePortArray, StringInfo *commandStringArray, bool *statusArray, StringInfo *resultStringArray, int commandCount); +static bool ExecuteOptionalSingleResultCommand(MultiConnection *connection, + char *queryString, StringInfo + queryResultString); static Tuplestorestate * CreateTupleStore(TupleDesc tupleDescriptor, StringInfo *nodeNameArray, int *nodePortArray, bool *statusArray, @@ -239,18 +242,66 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor FinishConnectionEstablishment(connection); + /* check whether connection attempt was successful */ if (PQstatus(connection->pgConn) != CONNECTION_OK) { appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, - (int) nodePort); + nodePort); statusArray[commandIndex] = false; + CloseConnection(connection); connectionArray[commandIndex] = NULL; finishedCount++; + continue; } - else + + /* set the application_name to avoid nested execution checks */ + int querySent = SendRemoteCommand(connection, SET_APPLICATION_NAME_QUERY); + if (querySent == 0) { - statusArray[commandIndex] = true; + StoreErrorMessage(connection, queryResultString); + statusArray[commandIndex] = false; + CloseConnection(connection); + connectionArray[commandIndex] = NULL; + finishedCount++; + continue; } + + statusArray[commandIndex] = true; + } + + /* send queries at once */ + for (int commandIndex = 0; commandIndex < commandCount; commandIndex++) + { + MultiConnection *connection = connectionArray[commandIndex]; + if (connection == NULL) + { + continue; + } + + bool raiseInterrupts = true; + PGresult *queryResult = GetRemoteCommandResult(connection, raiseInterrupts); + + /* write the result value or error message to queryResultString */ + StringInfo queryResultString = resultStringArray[commandIndex]; + bool success = EvaluateSingleQueryResult(connection, queryResult, + queryResultString); + if (!success) + { + statusArray[commandIndex] = false; + CloseConnection(connection); + connectionArray[commandIndex] = NULL; + finishedCount++; + continue; + } + + /* clear results for the next command */ + PQclear(queryResult); + + bool raiseErrors = false; + ClearResults(connection, raiseErrors); + + /* we only care about the SET application_name result on failure */ + resetStringInfo(queryResultString); } /* send queries at once */ @@ -357,7 +408,7 @@ GetConnectionStatusAndResult(MultiConnection *connection, bool *resultStatus, /* query result is available at this point */ PGresult *queryResult = PQgetResult(connection->pgConn); - bool success = EvaluateQueryResult(connection, queryResult, queryResultString); + bool success = EvaluateSingleQueryResult(connection, queryResult, queryResultString); PQclear(queryResult); *resultStatus = success; @@ -366,95 +417,6 @@ GetConnectionStatusAndResult(MultiConnection *connection, bool *resultStatus, } -/* - * EvaluateQueryResult gets the query result from connection and returns - * true if the query is executed successfully, false otherwise. A query result - * or an error message is returned in queryResultString. The function requires - * that the query returns a single column/single row result. It returns an - * error otherwise. - */ -static bool -EvaluateQueryResult(MultiConnection *connection, PGresult *queryResult, - StringInfo queryResultString) -{ - bool success = false; - - ExecStatusType resultStatus = PQresultStatus(queryResult); - if (resultStatus == PGRES_COMMAND_OK) - { - char *commandStatus = PQcmdStatus(queryResult); - appendStringInfo(queryResultString, "%s", commandStatus); - success = true; - } - else if (resultStatus == PGRES_TUPLES_OK) - { - int ntuples = PQntuples(queryResult); - int nfields = PQnfields(queryResult); - - /* error if query returns more than 1 rows, or more than 1 fields */ - if (nfields != 1) - { - appendStringInfo(queryResultString, - "expected a single column in query target"); - } - else if (ntuples > 1) - { - appendStringInfo(queryResultString, - "expected a single row in query result"); - } - else - { - int row = 0; - int column = 0; - if (!PQgetisnull(queryResult, row, column)) - { - char *queryResultValue = PQgetvalue(queryResult, row, column); - appendStringInfo(queryResultString, "%s", queryResultValue); - } - success = true; - } - } - else - { - StoreErrorMessage(connection, queryResultString); - } - - return success; -} - - -/* - * StoreErrorMessage gets the error message from connection and stores it - * in queryResultString. It should be called only when error is present - * otherwise it would return a default error message. - */ -static void -StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString) -{ - char *errorMessage = PQerrorMessage(connection->pgConn); - if (errorMessage != NULL) - { - /* copy the error message to a writable memory */ - errorMessage = pnstrdup(errorMessage, strlen(errorMessage)); - - char *firstNewlineIndex = strchr(errorMessage, '\n'); - - /* trim the error message at the line break */ - if (firstNewlineIndex != NULL) - { - *firstNewlineIndex = '\0'; - } - } - else - { - /* put a default error message if no error message is reported */ - errorMessage = "An error occurred while running the query"; - } - - appendStringInfo(queryResultString, "%s", errorMessage); -} - - /* * ExecuteCommandsAndStoreResults connects to each node specified in * nodeNameArray and nodePortArray, and executes command in commandStringArray @@ -469,63 +431,76 @@ ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray, int *nodePortArray, { for (int commandIndex = 0; commandIndex < commandCount; commandIndex++) { + CHECK_FOR_INTERRUPTS(); + char *nodeName = nodeNameArray[commandIndex]->data; int32 nodePort = nodePortArray[commandIndex]; char *queryString = commandStringArray[commandIndex]->data; StringInfo queryResultString = resultStringArray[commandIndex]; - bool reportResultError = false; - bool success = ExecuteRemoteQueryOrCommand(nodeName, nodePort, queryString, - queryResultString, reportResultError); + int connectionFlags = FORCE_NEW_CONNECTION; + MultiConnection *connection = + GetNodeConnection(connectionFlags, nodeName, nodePort); + + /* set the application_name to avoid nested execution checks */ + bool success = ExecuteOptionalSingleResultCommand(connection, + SET_APPLICATION_NAME_QUERY, + queryResultString); + if (!success) + { + statusArray[commandIndex] = false; + CloseConnection(connection); + continue; + } + + /* we only care about the SET application_name result on failure */ + resetStringInfo(queryResultString); + + /* send the actual query string */ + success = ExecuteOptionalSingleResultCommand(connection, queryString, + queryResultString); statusArray[commandIndex] = success; - - CHECK_FOR_INTERRUPTS(); + CloseConnection(connection); } } /* - * ExecuteRemoteQueryOrCommand executes a query at specified remote node using + * ExecuteOptionalSingleResultCommand executes a query at specified remote node using * the calling user's credentials. The function returns the query status * (success/failure), and query result. The query is expected to return a single * target containing zero or one rows. */ -bool -ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString, - StringInfo queryResultString, bool reportResultError) +static bool +ExecuteOptionalSingleResultCommand(MultiConnection *connection, char *queryString, + StringInfo queryResultString) { - int connectionFlags = FORCE_NEW_CONNECTION; - MultiConnection *connection = - GetNodeConnection(connectionFlags, nodeName, nodePort); - bool raiseInterrupts = true; - if (PQstatus(connection->pgConn) != CONNECTION_OK) { - appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, - (int) nodePort); + appendStringInfo(queryResultString, "failed to connect to %s:%d", + connection->hostname, connection->port); return false; } if (!SendRemoteCommand(connection, queryString)) { - appendStringInfo(queryResultString, "failed to send query to %s:%d", nodeName, - (int) nodePort); + appendStringInfo(queryResultString, "failed to send query to %s:%d", + connection->hostname, connection->port); return false; } + bool raiseInterrupts = true; PGresult *queryResult = GetRemoteCommandResult(connection, raiseInterrupts); - bool success = EvaluateQueryResult(connection, queryResult, queryResultString); - if (!success && reportResultError) - { - ReportResultError(connection, queryResult, ERROR); - } + /* write the result value or error message to queryResultString */ + bool success = EvaluateSingleQueryResult(connection, queryResult, queryResultString); + /* clear result and close the connection */ PQclear(queryResult); - /* close the connection */ - CloseConnection(connection); + bool raiseErrors = false; + ClearResults(connection, raiseErrors); return success; } diff --git a/src/backend/distributed/worker/worker_shard_visibility.c b/src/backend/distributed/worker/worker_shard_visibility.c index e482a955c..fe7fe5143 100644 --- a/src/backend/distributed/worker/worker_shard_visibility.c +++ b/src/backend/distributed/worker/worker_shard_visibility.c @@ -358,7 +358,8 @@ ShouldHideShardsInternal(void) return false; } - if (IsCitusInternalBackend() || IsRebalancerInternalBackend()) + if (IsCitusInternalBackend() || IsRebalancerInternalBackend() || + IsCitusRunCommandBackend()) { /* we never hide shards from Citus */ return false; diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index ccb4da535..036943c7c 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -70,10 +70,6 @@ extern int GetExternalClientBackendCount(void); extern uint32 IncrementExternalClientBackendCounter(void); extern void DecrementExternalClientBackendCounter(void); -extern bool ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, - char *queryString, StringInfo queryResultString, - bool reportResultError); - #define INVALID_CITUS_INTERNAL_BACKEND_GPID 0 #define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999 diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 2e31bc9da..7f47ffc62 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -35,6 +35,9 @@ /* application name used for internal connections in rebalancer */ #define CITUS_REBALANCER_NAME "citus_rebalancer" +/* application name used for connections made by run_command_on_* */ +#define CITUS_RUN_COMMAND_APPLICATION_NAME "citus_run_command" + /* deal with waiteventset errors */ #define WAIT_EVENT_SET_INDEX_NOT_INITIALIZED -1 #define WAIT_EVENT_SET_INDEX_FAILED -2 @@ -287,6 +290,7 @@ extern void ClaimConnectionExclusively(MultiConnection *connection); extern void UnclaimConnection(MultiConnection *connection); extern bool IsCitusInternalBackend(void); extern bool IsRebalancerInternalBackend(void); +extern bool IsCitusRunCommandBackend(void); extern void MarkConnectionConnected(MultiConnection *connection); /* waiteventset utilities */ diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 7e2c4852f..93d4f6bfd 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -66,4 +66,8 @@ extern void WaitForAllConnections(List *connectionList, bool raiseInterrupts); extern bool SendCancelationRequest(MultiConnection *connection); +extern bool EvaluateSingleQueryResult(MultiConnection *connection, PGresult *queryResult, + StringInfo queryResultString); +extern void StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString); + #endif /* REMOTE_COMMAND_H */ diff --git a/src/test/regress/expected/failure_multi_dml.out b/src/test/regress/expected/failure_multi_dml.out index 527255bdd..612b441c7 100644 --- a/src/test/regress/expected/failure_multi_dml.out +++ b/src/test/regress/expected/failure_multi_dml.out @@ -218,6 +218,12 @@ COMMIT; '], false ); +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +connection not open +CONTEXT: while executing command on localhost:xxxxx +while executing command on localhost:xxxxx master_run_on_worker --------------------------------------------------------------------- (localhost,57636,t,BEGIN) diff --git a/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out index a2792d0c4..ffaf739c6 100644 --- a/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out @@ -91,7 +91,7 @@ step s1-update-ref-table: update ref_table set a = a + 1; step s2-sleep: - SELECT pg_sleep(0.5); + SELECT pg_sleep(0.5); pg_sleep --------------------------------------------------------------------- @@ -112,9 +112,10 @@ query |state |wait_event_type|wa (2 rows) step s2-view-worker: - SELECT query, state, wait_event_type, wait_event, usename, datname + SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_stat_activity WHERE query NOT ILIKE ALL(VALUES + ('%application_name%'), ('%pg_prepared_xacts%'), ('%COMMIT%'), ('%dump_local_%'), @@ -168,9 +169,9 @@ step s1-update-ref-table: update ref_table set a = a + 1; step s2-active-transactions: - -- Admin should be able to see all transactions - SELECT count(*) FROM get_all_active_transactions() WHERE transaction_number != 0; - SELECT count(*) FROM get_global_active_transactions() WHERE transaction_number != 0; + -- Admin should be able to see all transactions + SELECT count(*) FROM get_all_active_transactions() WHERE transaction_number != 0; + SELECT count(*) FROM get_global_active_transactions() WHERE transaction_number != 0; count --------------------------------------------------------------------- diff --git a/src/test/regress/expected/run_command_on_all_nodes.out b/src/test/regress/expected/run_command_on_all_nodes.out index 3e37b7c60..76c42ad23 100644 --- a/src/test/regress/expected/run_command_on_all_nodes.out +++ b/src/test/regress/expected/run_command_on_all_nodes.out @@ -54,6 +54,7 @@ SELECT tablename FROM pg_tables WHERE schemaname = 'run_command_on_all_nodes'; (1 row) \c - - - :master_port +SET search_path TO run_command_on_all_nodes; SELECT result FROM run_command_on_all_nodes('SELECT tablename FROM pg_tables WHERE schemaname = ''run_command_on_all_nodes'';'); result --------------------------------------------------------------------- @@ -62,6 +63,13 @@ SELECT result FROM run_command_on_all_nodes('SELECT tablename FROM pg_tables WHE tbl (3 rows) +CREATE TABLE test (x int, y int); +SELECT create_distributed_table('test','x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + -- break a node and check messages BEGIN; SELECT nodeid AS worker_1_nodeid FROM pg_dist_node WHERE nodeport = :worker_1_port \gset @@ -127,6 +135,24 @@ SELECT success, result FROM run_command_on_all_nodes($$select result from run_co t | 57636 (3 rows) +SELECT success, result FROM run_command_on_all_nodes($$select result from run_command_on_coordinator('select count(*) from run_command_on_all_nodes.test')$$); + success | result +--------------------------------------------------------------------- + t | 0 + t | 0 + t | 0 +(3 rows) + +\c - - - :worker_1_port +-- poor man's DDL from worker +select result from run_command_on_coordinator($$create index on run_command_on_all_nodes.test (x)$$); + result +--------------------------------------------------------------------- + CREATE INDEX +(1 row) + +\c - - - :master_port +-- remove coordinator from metadata to restore pre-test situation SELECT citus_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0; citus_remove_node --------------------------------------------------------------------- @@ -141,5 +167,41 @@ ERROR: the coordinator is not added to the metadata HINT: Add the node as a coordinator by using: SELECT citus_set_coordinator_host('') CONTEXT: PL/pgSQL function run_command_on_coordinator(text,boolean) line XX at RAISE ROLLBACK; +-- check that we can do distributed queries from worker nodes +SELECT success, result FROM run_command_on_all_nodes($$insert into run_command_on_all_nodes.test values (1,2)$$, true); + success | result +--------------------------------------------------------------------- + t | INSERT 0 1 + t | INSERT 0 1 + t | INSERT 0 1 +(3 rows) + +SELECT success, result FROM run_command_on_all_nodes($$insert into run_command_on_all_nodes.test values (1,2)$$, false); + success | result +--------------------------------------------------------------------- + t | INSERT 0 1 + t | INSERT 0 1 + t | INSERT 0 1 +(3 rows) + +SELECT success, result FROM run_command_on_all_nodes($$select count(*) from run_command_on_all_nodes.test$$); + success | result +--------------------------------------------------------------------- + t | 6 + t | 6 + t | 6 +(3 rows) + +-- ddl commands are only allowed from the coordinator +SELECT success, result FROM run_command_on_all_nodes($$create index on run_command_on_all_nodes.test (x)$$); + success | result +--------------------------------------------------------------------- + f | ERROR: operation is not allowed on this node + f | ERROR: operation is not allowed on this node + t | CREATE INDEX +(3 rows) + DROP SCHEMA run_command_on_all_nodes CASCADE; -NOTICE: drop cascades to table run_command_on_all_nodes.tbl +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table run_command_on_all_nodes.tbl +drop cascades to table run_command_on_all_nodes.test diff --git a/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec b/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec index f55f869dd..11f260bfd 100644 --- a/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec +++ b/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec @@ -89,6 +89,7 @@ step "s2-view-worker" SELECT query, state, wait_event_type, wait_event, usename, datname FROM citus_stat_activity WHERE query NOT ILIKE ALL(VALUES + ('%application_name%'), ('%pg_prepared_xacts%'), ('%COMMIT%'), ('%dump_local_%'), diff --git a/src/test/regress/sql/run_command_on_all_nodes.sql b/src/test/regress/sql/run_command_on_all_nodes.sql index 2f6327ce2..0004a74e7 100644 --- a/src/test/regress/sql/run_command_on_all_nodes.sql +++ b/src/test/regress/sql/run_command_on_all_nodes.sql @@ -27,8 +27,13 @@ SELECT tablename FROM pg_tables WHERE schemaname = 'run_command_on_all_nodes'; SELECT tablename FROM pg_tables WHERE schemaname = 'run_command_on_all_nodes'; \c - - - :master_port +SET search_path TO run_command_on_all_nodes; + SELECT result FROM run_command_on_all_nodes('SELECT tablename FROM pg_tables WHERE schemaname = ''run_command_on_all_nodes'';'); +CREATE TABLE test (x int, y int); +SELECT create_distributed_table('test','x'); + -- break a node and check messages BEGIN; SELECT nodeid AS worker_1_nodeid FROM pg_dist_node WHERE nodeport = :worker_1_port \gset @@ -57,6 +62,12 @@ SELECT success, result FROM run_command_on_all_nodes($$select result from run_co -- we can use run_command_on_coordinator from any node if the coordinator is in the metadata SELECT citus_set_coordinator_host('localhost'); SELECT success, result FROM run_command_on_all_nodes($$select result from run_command_on_coordinator('select inet_server_port()')$$); +SELECT success, result FROM run_command_on_all_nodes($$select result from run_command_on_coordinator('select count(*) from run_command_on_all_nodes.test')$$); +\c - - - :worker_1_port +-- poor man's DDL from worker +select result from run_command_on_coordinator($$create index on run_command_on_all_nodes.test (x)$$); +\c - - - :master_port +-- remove coordinator from metadata to restore pre-test situation SELECT citus_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0; -- check that we fail when pg_dist_node is empty @@ -65,4 +76,12 @@ DELETE FROM pg_dist_node; SELECT success, result FROM run_command_on_coordinator('select inet_server_port()'); ROLLBACK; +-- check that we can do distributed queries from worker nodes +SELECT success, result FROM run_command_on_all_nodes($$insert into run_command_on_all_nodes.test values (1,2)$$, true); +SELECT success, result FROM run_command_on_all_nodes($$insert into run_command_on_all_nodes.test values (1,2)$$, false); +SELECT success, result FROM run_command_on_all_nodes($$select count(*) from run_command_on_all_nodes.test$$); + +-- ddl commands are only allowed from the coordinator +SELECT success, result FROM run_command_on_all_nodes($$create index on run_command_on_all_nodes.test (x)$$); + DROP SCHEMA run_command_on_all_nodes CASCADE;