From b453f6c7ab79b0577b271daed118e201d0fdad48 Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Fri, 14 Oct 2016 11:30:40 +0300 Subject: [PATCH] Add master_run_on_worker UDF --- .../distributed/master/citus_tools.sql | 285 +++++++++ .../distributed/master/master_citus_tools.c | 566 ++++++++++++++++++ .../regress/expected/multi_citus_tools.out | 370 ++++++++++++ src/test/regress/multi_schedule | 5 + src/test/regress/sql/multi_citus_tools.sql | 216 +++++++ 5 files changed, 1442 insertions(+) create mode 100644 src/backend/distributed/master/citus_tools.sql create mode 100644 src/backend/distributed/master/master_citus_tools.c create mode 100644 src/test/regress/expected/multi_citus_tools.out create mode 100644 src/test/regress/sql/multi_citus_tools.sql diff --git a/src/backend/distributed/master/citus_tools.sql b/src/backend/distributed/master/citus_tools.sql new file mode 100644 index 000000000..136d71cd8 --- /dev/null +++ b/src/backend/distributed/master/citus_tools.sql @@ -0,0 +1,285 @@ +/* + * citus_tools.sql + * Contains definitions of citus_tools UDFs + * - citus_run_on_all_workers + * - citus_run_on_all_placements + * - citus_run_on_all_colocated_placements + * - citus_run_on_all_shards + * + * These functions depends on presence of UDF master_run_on_worker + */ + +CREATE OR REPLACE FUNCTION master_run_on_worker(worker_name text[], port integer[], + command text[], parallel boolean, + OUT node_name text, OUT node_port integer, + OUT success boolean, OUT result text ) + RETURNS SETOF record + LANGUAGE C STABLE STRICT + AS 'citus.so', $$master_run_on_worker$$; + + +CREATE TYPE colocation_placement_type AS ( + shardid1 bigint, + shardid2 bigint, + nodename text, + nodeport bigint +); + +-- +-- tables_colocated returns true if given tables are co-located, false otherwise. +-- The function checks shard definitions, matches shard placements for given tables. +-- +CREATE OR REPLACE FUNCTION citus_tables_colocated(table1 regclass, table2 regclass) + RETURNS bool + LANGUAGE plpgsql + AS $function$ +DECLARE + colocated_shard_count int; + table1_shard_count int; + table2_shard_count int; + table1_placement_count int; + table2_placement_count int; + table1_placements colocation_placement_type[]; + table2_placements colocation_placement_type[]; +BEGIN + SELECT count(*), + (SELECT count(*) FROM pg_dist_shard a WHERE a.logicalrelid = table1), + (SELECT count(*) FROM pg_dist_shard b WHERE b.logicalrelid = table2) + INTO colocated_shard_count, table1_shard_count, table2_shard_count + FROM pg_dist_shard tba JOIN pg_dist_shard tbb USING(shardminvalue, shardmaxvalue) + WHERE tba.logicalrelid = table1 AND tbb.logicalrelid = table2; + + IF (table1_shard_count != table2_shard_count OR + table1_shard_count != colocated_shard_count) + THEN + RETURN false; + END IF; + + WITH colocated_shards AS ( + SELECT tba.shardid as shardid1, tbb.shardid as shardid2 + FROM pg_dist_shard tba JOIN pg_dist_shard tbb USING(shardminvalue, shardmaxvalue) + WHERE tba.logicalrelid = table1 AND tbb.logicalrelid = table2), + left_shard_placements AS ( + SELECT cs.shardid1, cs.shardid2, sp.nodename, sp.nodeport + FROM colocated_shards cs JOIN pg_dist_shard_placement sp ON (cs.shardid1 = sp.shardid) + WHERE sp.shardstate = 1) + SELECT + array_agg((lsp.shardid1, lsp.shardid2, lsp.nodename, lsp.nodeport)::colocation_placement_type + ORDER BY shardid1, shardid2, nodename, nodeport), + count(distinct lsp.shardid1) + FROM left_shard_placements lsp + INTO table1_placements, table1_placement_count; + + WITH colocated_shards AS ( + SELECT tba.shardid as shardid1, tbb.shardid as shardid2 + FROM pg_dist_shard tba JOIN pg_dist_shard tbb USING(shardminvalue, shardmaxvalue) + WHERE tba.logicalrelid = table1 AND tbb.logicalrelid = table2), + right_shard_placements AS ( + SELECT cs.shardid1, cs.shardid2, sp.nodename, sp.nodeport + FROM colocated_shards cs LEFT JOIN pg_dist_shard_placement sp ON(cs.shardid2 = sp.shardid) + WHERE sp.shardstate = 1) + SELECT + array_agg((rsp.shardid1, rsp.shardid2, rsp.nodename, rsp.nodeport)::colocation_placement_type + ORDER BY shardid1, shardid2, nodename, nodeport), + count(distinct rsp.shardid2) + FROM right_shard_placements rsp + INTO table2_placements, table2_placement_count; + + IF (table1_shard_count != table1_placement_count + OR table1_placement_count != table2_placement_count) THEN + RETURN false; + END IF; + + IF (array_length(table1_placements, 1) != array_length(table2_placements, 1)) THEN + RETURN false; + END IF; + + FOR i IN 1..array_length(table1_placements,1) LOOP + IF (table1_placements[i].nodename != table2_placements[i].nodename OR + table1_placements[i].nodeport != table2_placements[i].nodeport) THEN + RETURN false; + END IF; + END LOOP; + + RETURN true; +END; +$function$; + + +CREATE OR REPLACE FUNCTION citus_run_on_all_workers(command text, + parallel bool default true, + OUT nodename text, + OUT nodeport int, + OUT success bool, + OUT result text) + RETURNS SETOF record + LANGUAGE plpgsql + AS $function$ +DECLARE + workers text[]; + ports int[]; + commands text[]; +BEGIN + WITH citus_workers AS ( + SELECT * FROM master_get_active_worker_nodes() ORDER BY node_name, node_port) + SELECT array_agg(node_name), array_agg(node_port), array_agg(command) + INTO workers, ports, commands + FROM citus_workers; + + RETURN QUERY SELECT * FROM master_run_on_worker(workers, ports, commands, parallel); +END; +$function$; + + +CREATE OR REPLACE FUNCTION citus_run_on_all_placements(table_name regclass, command text, + parallel bool default true, + OUT nodename text, + OUT nodeport int, + OUT shardid bigint, + OUT success bool, OUT result text) + RETURNS SETOF record + LANGUAGE plpgsql + AS $function$ +DECLARE + workers text[]; + ports int[]; + shards bigint[]; + commands text[]; +BEGIN + WITH citus_placements AS ( + SELECT + ds.logicalrelid::regclass AS tablename, + ds.shardid AS shardid, + shard_name(ds.logicalrelid, ds.shardid) AS shardname, + dsp.nodename AS nodename, dsp.nodeport::int AS nodeport + FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid) + WHERE dsp.shardstate = 1 and ds.logicalrelid::regclass = table_name + ORDER BY ds.logicalrelid, ds.shardid, dsp.nodename, dsp.nodeport) + SELECT + array_agg(cp.nodename), array_agg(cp.nodeport), array_agg(cp.shardid), + array_agg(format(command, cp.shardname)) + INTO workers, ports, shards, commands + FROM citus_placements cp; + + RETURN QUERY + SELECT r.node_name, r.node_port, shards[ordinality], + r.success, r.result + FROM master_run_on_worker(workers, ports, commands, parallel) WITH ORDINALITY r; +END; +$function$; + + +CREATE OR REPLACE FUNCTION citus_run_on_all_colocated_placements(table_name1 regclass, + table_name2 regclass, + command text, + parallel bool default true, + OUT nodename text, + OUT nodeport int, + OUT shardid1 bigint, + OUT shardid2 bigint, + OUT success bool, + OUT result text) + RETURNS SETOF record + LANGUAGE plpgsql + AS $function$ +DECLARE + workers text[]; + ports int[]; + shards1 bigint[]; + shards2 bigint[]; + commands text[]; +BEGIN + IF NOT (SELECT citus_tables_colocated(table_name1, table_name2)) THEN + RAISE EXCEPTION 'tables % and % are not co-located', table_name1, table_name2; + END IF; + + WITH active_shard_placements AS ( + SELECT + ds.logicalrelid, + ds.shardid AS shardid, + shard_name(ds.logicalrelid, ds.shardid) AS shardname, + ds.shardminvalue AS shardminvalue, + ds.shardmaxvalue AS shardmaxvalue, + dsp.nodename AS nodename, + dsp.nodeport::int AS nodeport + FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid) + WHERE dsp.shardstate = 1 and (ds.logicalrelid::regclass = table_name1 or + ds.logicalrelid::regclass = table_name2) + ORDER BY ds.logicalrelid, ds.shardid, dsp.nodename, dsp.nodeport), + citus_colocated_placements AS ( + SELECT + a.logicalrelid::regclass AS tablename1, + a.shardid AS shardid1, + shard_name(a.logicalrelid, a.shardid) AS shardname1, + b.logicalrelid::regclass AS tablename2, + b.shardid AS shardid2, + shard_name(b.logicalrelid, b.shardid) AS shardname2, + a.nodename AS nodename, + a.nodeport::int AS nodeport + FROM + active_shard_placements a, active_shard_placements b + WHERE + a.shardminvalue = b.shardminvalue AND + a.shardmaxvalue = b.shardmaxvalue AND + a.logicalrelid != b.logicalrelid AND + a.nodename = b.nodename AND + a.nodeport = b.nodeport AND + a.logicalrelid::regclass = table_name1 AND + b.logicalrelid::regclass = table_name2 + ORDER BY a.logicalrelid, a.shardid, nodename, nodeport) + SELECT + array_agg(cp.nodename), array_agg(cp.nodeport), array_agg(cp.shardid1), + array_agg(cp.shardid2), array_agg(format(command, cp.shardname1, cp.shardname2)) + INTO workers, ports, shards1, shards2, commands + FROM citus_colocated_placements cp; + + RETURN QUERY SELECT r.node_name, r.node_port, shards1[ordinality], + shards2[ordinality], r.success, r.result + FROM master_run_on_worker(workers, ports, commands, parallel) WITH ORDINALITY r; +END; +$function$; + + +CREATE OR REPLACE FUNCTION citus_run_on_all_shards(table_name regclass, command text, + parallel bool default true, + OUT shardid bigint, + OUT success bool, + OUT result text) + RETURNS SETOF record + LANGUAGE plpgsql + AS $function$ +DECLARE + workers text[]; + ports int[]; + shards bigint[]; + commands text[]; + shard_count int; +BEGIN + SELECT COUNT(*) INTO shard_count FROM pg_dist_shard + WHERE logicalrelid = table_name; + + WITH citus_shards AS ( + SELECT ds.logicalrelid::regclass AS tablename, + ds.shardid AS shardid, + shard_name(ds.logicalrelid, ds.shardid) AS shardname, + array_agg(dsp.nodename) AS nodenames, + array_agg(dsp.nodeport) AS nodeports + FROM pg_dist_shard ds LEFT JOIN pg_dist_shard_placement dsp USING (shardid) + WHERE dsp.shardstate = 1 and ds.logicalrelid::regclass = table_name + GROUP BY ds.logicalrelid, ds.shardid + ORDER BY ds.logicalrelid, ds.shardid) + SELECT + array_agg(cs.nodenames[1]), array_agg(cs.nodeports[1]), array_agg(cs.shardid), + array_agg(format(command, cs.shardname)) + INTO workers, ports, shards, commands + FROM citus_shards cs; + + IF (shard_count != array_length(workers, 1)) THEN + RAISE NOTICE 'some shards do not have active placements'; + END IF; + + RETURN QUERY + SELECT shards[ordinality], r.success, r.result + FROM master_run_on_worker(workers, ports, commands, parallel) WITH ORDINALITY r; +END; +$function$; diff --git a/src/backend/distributed/master/master_citus_tools.c b/src/backend/distributed/master/master_citus_tools.c new file mode 100644 index 000000000..d7d571e97 --- /dev/null +++ b/src/backend/distributed/master/master_citus_tools.c @@ -0,0 +1,566 @@ +/*------------------------------------------------------------------------- + * + * master_citus_tools.c + * UDF to run multi shard/worker queries + * + * This file contains functions to run commands on other worker/shards. + * + * Copyright (c) 2016-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "funcapi.h" +#include "libpq-fe.h" +#include "miscadmin.h" + + +#include "access/htup_details.h" +#include "catalog/pg_type.h" +#include "distributed/connection_cache.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_server_executor.h" +#include "distributed/worker_protocol.h" +#include "lib/stringinfo.h" +#include "utils/builtins.h" + +#include "distributed/multi_client_executor.h" + + +PG_FUNCTION_INFO_V1(master_run_on_worker); + +static int ParseCommandParameters(FunctionCallInfo fcinfo, StringInfo **nodeNameArray, + int **nodePortsArray, StringInfo **commandStringArray, + bool *parallel); +static void ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, + int *nodePortArray, + StringInfo *commandStringArray, + bool *statusArray, + StringInfo *resultStringArray, + int commmandCount); +static bool GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, + StringInfo queryResultString); +static bool EvaluateQueryResult(PGconn *connection, PGresult *queryResult, StringInfo + queryResultString); +static void StoreErrorMessage(PGconn *connection, StringInfo queryResultString); +static void ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray, + int *nodePortArray, + StringInfo *commandStringArray, + bool *statusArray, + StringInfo *resultStringArray, + int commmandCount); +static bool ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, + char *queryString, StringInfo queryResult); +static Tuplestorestate * CreateTupleStore(TupleDesc tupleDescriptor, + StringInfo *nodeNameArray, int *nodePortArray, + bool *statusArray, + StringInfo *resultArray, int commandCount); + + +/* + * master_run_on_worker executes queries/commands to run on specified worker and + * returns success status and query/command result. Expected input is 3 arrays + * containing node names, node ports, and query strings, and boolean flag to specify + * parallel execution. The function then returns node_name, node_port, success, + * result tuples upon completion of the query. The same user credentials are used + * to connect to remote nodes. + */ +Datum +master_run_on_worker(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + MemoryContext per_query_ctx = NULL; + MemoryContext oldcontext = NULL; + TupleDesc tupleDescriptor = NULL; + Tuplestorestate *tupleStore = NULL; + bool parallelExecution = false; + StringInfo *nodeNameArray = NULL; + int *nodePortArray = NULL; + StringInfo *commandStringArray = NULL; + bool *statusArray = NULL; + StringInfo *resultArray = NULL; + int commandIndex = 0; + int commandCount = 0; + + /* check to see if caller supports us returning a tuplestore */ + if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize)) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("materialize mode required, but it is not " + "allowed in this context"))); + } + + commandCount = ParseCommandParameters(fcinfo, &nodeNameArray, &nodePortArray, + &commandStringArray, ¶llelExecution); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + /* get the requested return tuple description */ + tupleDescriptor = CreateTupleDescCopy(rsinfo->expectedDesc); + + /* + * Check to make sure we have correct tuple descriptor + */ + if (tupleDescriptor->natts != 4 || + tupleDescriptor->attrs[0]->atttypid != TEXTOID || + tupleDescriptor->attrs[1]->atttypid != INT4OID || + tupleDescriptor->attrs[2]->atttypid != BOOLOID || + tupleDescriptor->attrs[3]->atttypid != TEXTOID) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_DEFINITION), + errmsg("query-specified return tuple and " + "function return type are not compatible"))); + } + + /* prepare storage for status and result values */ + statusArray = palloc0(commandCount * sizeof(bool)); + resultArray = palloc0(commandCount * sizeof(StringInfo)); + for (commandIndex = 0; commandIndex < commandCount; commandIndex++) + { + resultArray[commandIndex] = makeStringInfo(); + } + + if (parallelExecution) + { + ExecuteCommandsInParallelAndStoreResults(nodeNameArray, nodePortArray, + commandStringArray, + statusArray, resultArray, commandCount); + } + else + { + ExecuteCommandsAndStoreResults(nodeNameArray, nodePortArray, commandStringArray, + statusArray, resultArray, commandCount); + } + + /* let the caller know we're sending back a tuplestore */ + rsinfo->returnMode = SFRM_Materialize; + tupleStore = CreateTupleStore(tupleDescriptor, + nodeNameArray, nodePortArray, statusArray, + resultArray, commandCount); + rsinfo->setResult = tupleStore; + rsinfo->setDesc = tupleDescriptor; + + MemoryContextSwitchTo(oldcontext); + + PG_RETURN_VOID(); +} + + +/* ParseCommandParameters reads call parameters and fills in data structures */ +static int +ParseCommandParameters(FunctionCallInfo fcinfo, StringInfo **nodeNameArray, + int **nodePortsArray, StringInfo **commandStringArray, + bool *parallel) +{ + ArrayType *nodeNameArrayObject = PG_GETARG_ARRAYTYPE_P(0); + ArrayType *nodePortArrayObject = PG_GETARG_ARRAYTYPE_P(1); + ArrayType *commandStringArrayObject = PG_GETARG_ARRAYTYPE_P(2); + bool parallelExecution = PG_GETARG_BOOL(3); + int nodeNameCount = ArrayObjectCount(nodeNameArrayObject); + int nodePortCount = ArrayObjectCount(nodePortArrayObject); + int commandStringCount = ArrayObjectCount(commandStringArrayObject); + Datum *nodeNameDatumArray = DeconstructArrayObject(nodeNameArrayObject); + Datum *nodePortDatumArray = DeconstructArrayObject(nodePortArrayObject); + Datum *commandStringDatumArray = DeconstructArrayObject(commandStringArrayObject); + int index = 0; + StringInfo *nodeNames = NULL; + int *nodePorts = NULL; + StringInfo *commandStrings = NULL; + + if (nodeNameCount != nodePortCount || nodeNameCount != commandStringCount) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("expected same number of node name, port, and query string"))); + } + + nodeNames = palloc0(nodeNameCount * sizeof(StringInfo)); + nodePorts = palloc0(nodeNameCount * sizeof(int)); + commandStrings = palloc0(nodeNameCount * sizeof(StringInfo)); + + for (index = 0; index < nodeNameCount; index++) + { + text *nodeNameText = DatumGetTextP(nodeNameDatumArray[index]); + char *nodeName = text_to_cstring(nodeNameText); + int32 nodePort = DatumGetInt32(nodePortDatumArray[index]); + text *commandText = DatumGetTextP(commandStringDatumArray[index]); + char *commandString = text_to_cstring(commandText); + + nodeNames[index] = makeStringInfo(); + commandStrings[index] = makeStringInfo(); + + appendStringInfo(nodeNames[index], "%s", nodeName); + nodePorts[index] = nodePort; + appendStringInfo(commandStrings[index], "%s", commandString); + } + + *nodeNameArray = nodeNames; + *nodePortsArray = nodePorts; + *commandStringArray = commandStrings; + *parallel = parallelExecution; + + return nodeNameCount; +} + + +/* + * ExecuteCommandsInParellelAndStoreResults connects to each node specified in + * nodeNameArray and nodePortArray, and executes command in commandStringArray + * in parallel fashion. Execution success status and result is reported for + * each command in statusArray and resultStringArray. Each array contains + * commandCount items. + */ +static void +ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePortArray, + StringInfo *commandStringArray, + bool *statusArray, StringInfo *resultStringArray, + int commmandCount) +{ + int commandIndex = 0; + char *nodeUser = CurrentUserName(); + PGconn **connectionArray = palloc0(commmandCount * sizeof(PGconn *)); + int finishedCount = 0; + + /* establish connections */ + for (commandIndex = 0; commandIndex < commmandCount; commandIndex++) + { + char *nodeName = nodeNameArray[commandIndex]->data; + int nodePort = nodePortArray[commandIndex]; + PGconn *connection = ConnectToNode(nodeName, nodePort, nodeUser); + StringInfo queryResultString = resultStringArray[commandIndex]; + + statusArray[commandIndex] = true; + + connectionArray[commandIndex] = connection; + + if (connection == NULL) + { + appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, + (int) nodePort); + statusArray[commandIndex] = false; + finishedCount++; + } + } + + /* send queries at once */ + for (commandIndex = 0; commandIndex < commmandCount; commandIndex++) + { + int querySent = 0; + PGconn *connection = connectionArray[commandIndex]; + char *queryString = commandStringArray[commandIndex]->data; + StringInfo queryResultString = resultStringArray[commandIndex]; + + /* + * If we don't have a connection, nothing to send, error string should already + * been filled. + */ + if (connection == NULL) + { + continue; + } + + querySent = PQsendQuery(connection, queryString); + + if (querySent == 0) + { + StoreErrorMessage(connection, queryResultString); + statusArray[commandIndex] = false; + PQfinish(connection); + connectionArray[commandIndex] = NULL; + finishedCount++; + } + } + + /* check for query results */ + while (finishedCount < commmandCount) + { + for (commandIndex = 0; commandIndex < commmandCount; commandIndex++) + { + PGconn *connection = connectionArray[commandIndex]; + StringInfo queryResultString = resultStringArray[commandIndex]; + bool success = false; + bool queryFinished = false; + + if (connection == NULL) + { + continue; + } + + queryFinished = GetConnectionStatusAndResult(connection, &success, + queryResultString); + + if (queryFinished) + { + finishedCount++; + statusArray[commandIndex] = success; + connectionArray[commandIndex] = NULL; + PQfinish(connection); + } + } + + CHECK_FOR_INTERRUPTS(); + + if (finishedCount < commmandCount) + { + long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L; + pg_usleep(sleepIntervalPerCycle); + } + } + + pfree(connectionArray); +} + + +/* + * GetConnectionStatusAndResult checks the active connection and returns true if + * query execution is finished (either success or fail). + * Query success/fail in resultStatus, and query result in queryResultString are + * reported upon completion of the query. + */ +static bool +GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus, + StringInfo queryResultString) +{ + bool finished = true; + ConnStatusType connectionStatus = PQstatus(connection); + int consumeInput = 0; + PGresult *queryResult = NULL; + bool success = false; + + *resultStatus = false; + resetStringInfo(queryResultString); + + if (connectionStatus == CONNECTION_BAD) + { + appendStringInfo(queryResultString, "connection lost"); + return finished; + } + + consumeInput = PQconsumeInput(connection); + if (consumeInput == 0) + { + appendStringInfo(queryResultString, "query result unavailable"); + return finished; + } + + /* check later if busy */ + if (PQisBusy(connection) != 0) + { + finished = false; + return finished; + } + + /* query result is available at this point */ + queryResult = PQgetResult(connection); + success = EvaluateQueryResult(connection, queryResult, queryResultString); + PQclear(queryResult); + + *resultStatus = success; + finished = true; + return true; +} + + +/* + * EvaluateQueryResult gets the query result from connection and returns + * true if the query is executed successfully, false otherwise. A query result + * or an error message is returned in queryResultString. The function requires + * that the query returns a single column/single row result. It returns an + * error otherwise. + */ +static bool +EvaluateQueryResult(PGconn *connection, PGresult *queryResult, + StringInfo queryResultString) +{ + bool success = false; + + ExecStatusType resultStatus = PQresultStatus(queryResult); + if (resultStatus == PGRES_COMMAND_OK) + { + char *commandStatus = PQcmdStatus(queryResult); + appendStringInfo(queryResultString, "%s", commandStatus); + success = true; + } + else if (resultStatus == PGRES_TUPLES_OK) + { + int ntuples = PQntuples(queryResult); + int nfields = PQnfields(queryResult); + + /* error if query returns more than 1 rows, or more than 1 fields */ + if (nfields != 1) + { + appendStringInfo(queryResultString, + "expected a single column in query target"); + } + else if (ntuples > 1) + { + appendStringInfo(queryResultString, + "expected a single row in query result"); + } + else + { + int row = 0; + int column = 0; + if (!PQgetisnull(queryResult, row, column)) + { + char *queryResultValue = PQgetvalue(queryResult, row, column); + appendStringInfo(queryResultString, "%s", queryResultValue); + } + success = true; + } + } + else + { + StoreErrorMessage(connection, queryResultString); + } + + return success; +} + + +/* + * StoreErrorMessage gets the error message from connection and stores it + * in queryResultString. It should be called only when error is present + * otherwise it would return a default error message. + */ +static void +StoreErrorMessage(PGconn *connection, StringInfo queryResultString) +{ + char *errorMessage = PQerrorMessage(connection); + char *firstNewlineIndex = strchr(errorMessage, '\n'); + + /* trim the error message at the line break */ + if (firstNewlineIndex != NULL) + { + *firstNewlineIndex = '\0'; + } + + /* put a default error message if no error message is reported */ + if (errorMessage == NULL) + { + errorMessage = "An error occurred while running the query"; + } + + appendStringInfo(queryResultString, "%s", errorMessage); +} + + +/* + * ExecuteCommandsAndStoreResults connects to each node specified in + * nodeNameArray and nodePortArray, and executes command in commandStringArray + * in sequential order. Execution success status and result is reported for + * each command in statusArray and resultStringArray. Each array contains + * commandCount items. + */ +static void +ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray, int *nodePortArray, + StringInfo *commandStringArray, bool *statusArray, + StringInfo *resultStringArray, int commmandCount) +{ + int commandIndex = 0; + for (commandIndex = 0; commandIndex < commmandCount; commandIndex++) + { + char *nodeName = nodeNameArray[commandIndex]->data; + int32 nodePort = nodePortArray[commandIndex]; + bool success = false; + char *queryString = commandStringArray[commandIndex]->data; + StringInfo queryResultString = resultStringArray[commandIndex]; + + success = ExecuteRemoteQueryOrCommand(nodeName, nodePort, queryString, + queryResultString); + + statusArray[commandIndex] = success; + + CHECK_FOR_INTERRUPTS(); + } +} + + +/* + * ExecuteRemoteQueryOrCommand executes a query at specified remote node using + * the calling user's credentials. The function returns the query status + * (success/failure), and query result. The query is expected to return a single + * target containing zero or one rows. + */ +static bool +ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString, + StringInfo queryResultString) +{ + char *nodeUser = CurrentUserName(); + PGconn *nodeConnection = ConnectToNode(nodeName, nodePort, nodeUser); + bool success = false; + + if (nodeConnection == NULL) + { + appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName, + (int) nodePort); + return false; + } + + PG_TRY(); + { + PGresult *queryResult = PQexec(nodeConnection, queryString); + success = EvaluateQueryResult(nodeConnection, queryResult, queryResultString); + + PQclear(queryResult); + + /* close the connection */ + PQfinish(nodeConnection); + nodeConnection = NULL; + } + PG_CATCH(); + { + StoreErrorMessage(nodeConnection, queryResultString); + + /* close the connection */ + PQfinish(nodeConnection); + nodeConnection = NULL; + } + PG_END_TRY(); + + return success; +} + + +/* CreateTupleStore prepares result tuples from individual query results */ +static Tuplestorestate * +CreateTupleStore(TupleDesc tupleDescriptor, + StringInfo *nodeNameArray, int *nodePortArray, bool *statusArray, + StringInfo *resultArray, int commandCount) +{ + Tuplestorestate *tupleStore = tuplestore_begin_heap(true, false, work_mem); + int commandIndex = 0; + bool nulls[4] = { false, false, false, false }; + + for (commandIndex = 0; commandIndex < commandCount; commandIndex++) + { + Datum values[4]; + HeapTuple tuple = NULL; + StringInfo nodeNameString = nodeNameArray[commandIndex]; + StringInfo resultString = resultArray[commandIndex]; + text *nodeNameText = cstring_to_text_with_len(nodeNameString->data, + nodeNameString->len); + text *resultText = cstring_to_text_with_len(resultString->data, + resultString->len); + + values[0] = PointerGetDatum(nodeNameText); + values[1] = Int32GetDatum(nodePortArray[commandIndex]); + values[2] = BoolGetDatum(statusArray[commandIndex]); + values[3] = PointerGetDatum(resultText); + + tuple = heap_form_tuple(tupleDescriptor, values, nulls); + tuplestore_puttuple(tupleStore, tuple); + + heap_freetuple(tuple); + pfree(nodeNameText); + pfree(resultText); + } + + tuplestore_donestoring(tupleStore); + + return tupleStore; +} diff --git a/src/test/regress/expected/multi_citus_tools.out b/src/test/regress/expected/multi_citus_tools.out new file mode 100644 index 000000000..3e604087a --- /dev/null +++ b/src/test/regress/expected/multi_citus_tools.out @@ -0,0 +1,370 @@ +-- +-- MULTI CITUS TOOLS +-- +-- tests UDFs created for citus tools +-- +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1240000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1240000; +-- the function is not exposed explicitly, create the entry point +CREATE OR REPLACE FUNCTION master_run_on_worker(worker_name text[], port integer[], + command text[], + parallel boolean default false, + OUT node_name text, OUT node_port integer, + OUT success boolean, OUT result text) + RETURNS SETOF record + LANGUAGE C STABLE STRICT + AS 'citus.so', $$master_run_on_worker$$; +-- test with invalid port, prevent OS dependent warning from being displayed +SET client_min_messages to ERROR; +SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[], + ARRAY['select count(*) from pg_dist_shard']::text[], + false); + node_name | node_port | success | result +-----------+-----------+---------+------------------------------------ + localhost | 666 | f | failed to connect to localhost:666 +(1 row) + +SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[], + ARRAY['select count(*) from pg_dist_shard']::text[], + true); + node_name | node_port | success | result +-----------+-----------+---------+------------------------------------ + localhost | 666 | f | failed to connect to localhost:666 +(1 row) + +RESET client_min_messages; +-- store worker node name and port +SELECT quote_literal(node_name) as node_name, node_port as node_port + FROM master_get_active_worker_nodes() + ORDER BY node_port + LIMIT 1 \gset +-- connect to the first worker and ask for shard count, should return 0 +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select count(*) from pg_dist_shard']::text[], + false); + node_name | node_port | success | result +-----------+-----------+---------+-------- + localhost | 57637 | t | 0 +(1 row) + +-- connect to the first worker and ask for shards, should fail with +-- expecting a single column error +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select * from pg_dist_shard']::text[], + false); + node_name | node_port | success | result +-----------+-----------+---------+------------------------------------------ + localhost | 57637 | f | expected a single column in query target +(1 row) + +-- query result may only contain a single row +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select a from generate_series(1,2) a']::text[], + false); + node_name | node_port | success | result +-----------+-----------+---------+--------------------------------------- + localhost | 57637 | f | expected a single row in query result +(1 row) + + +-- send multiple queries +SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], + ARRAY[:node_port, :node_port]::int[], + ARRAY['select a from generate_series(1,1) a', + 'select a from generate_series(2,2) a']::text[], + false); + node_name | node_port | success | result +-----------+-----------+---------+-------- + localhost | 57637 | t | 1 + localhost | 57637 | t | 2 +(2 rows) + +-- send multiple queries, one fails +SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], + ARRAY[:node_port, :node_port]::int[], + ARRAY['select a from generate_series(1,1) a', + 'select a from generate_series(1,2) a']::text[], + false); + node_name | node_port | success | result +-----------+-----------+---------+--------------------------------------- + localhost | 57637 | t | 1 + localhost | 57637 | f | expected a single row in query result +(2 rows) + +-- send multiple queries, both fail +SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], + ARRAY[:node_port, :node_port]::int[], + ARRAY['select a from generate_series(1,2) a', + 'select a from generate_series(1,2) a']::text[], + false); + node_name | node_port | success | result +-----------+-----------+---------+--------------------------------------- + localhost | 57637 | f | expected a single row in query result + localhost | 57637 | f | expected a single row in query result +(2 rows) + +-- can create tables at worker +SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], + ARRAY[:node_port, :node_port]::int[], + ARRAY['create table first_table(a int, b int)', + 'create table second_table(a int, b int)']::text[], + false); + node_name | node_port | success | result +-----------+-----------+---------+-------------- + localhost | 57637 | t | CREATE TABLE + localhost | 57637 | t | CREATE TABLE +(2 rows) + +-- can insert into table +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['insert into first_table select a,a from generate_series(1,20) a']::text[], + false); + node_name | node_port | success | result +-----------+-----------+---------+------------- + localhost | 57637 | t | INSERT 0 20 +(1 row) + +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select count(*) from first_table']::text[], + false); + node_name | node_port | success | result +-----------+-----------+---------+-------- + localhost | 57637 | t | 20 +(1 row) + +-- insert into second table twice +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['insert into second_table select * from first_table']::text[], + false); + node_name | node_port | success | result +-----------+-----------+---------+------------- + localhost | 57637 | t | INSERT 0 20 +(1 row) + +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['insert into second_table select * from first_table']::text[], + false); + node_name | node_port | success | result +-----------+-----------+---------+------------- + localhost | 57637 | t | INSERT 0 20 +(1 row) + +-- check inserted values at second table +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select count(*) from second_table']::text[], + false); + node_name | node_port | success | result +-----------+-----------+---------+-------- + localhost | 57637 | t | 40 +(1 row) + +-- store worker node name and port again +-- previously set variables become unusable after some number of uses +SELECT quote_literal(node_name) as node_name, node_port as node_port + FROM master_get_active_worker_nodes() + ORDER BY node_port + LIMIT 1 \gset +-- create index on tables +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['create index first_table_index on first_table(a)']::text[], + false); + node_name | node_port | success | result +-----------+-----------+---------+-------------- + localhost | 57637 | t | CREATE INDEX +(1 row) + +-- drop created tables +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['drop table first_table']::text[], + false); + node_name | node_port | success | result +-----------+-----------+---------+------------ + localhost | 57637 | t | DROP TABLE +(1 row) + +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['drop table second_table']::text[], + false); + node_name | node_port | success | result +-----------+-----------+---------+------------ + localhost | 57637 | t | DROP TABLE +(1 row) + + +-- verify table is dropped +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select count(*) from second_table']::text[], + false); + node_name | node_port | success | result +-----------+-----------+---------+------------------------------------------------ + localhost | 57637 | f | ERROR: relation "second_table" does not exist +(1 row) + +-- +-- Run the same tests in parallel +-- +-- connect to the first worker and ask for shard count, should return 0 +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select count(*) from pg_dist_shard']::text[], + true); + node_name | node_port | success | result +-----------+-----------+---------+-------- + localhost | 57637 | t | 0 +(1 row) + +-- connect to the first worker and ask for shards, should fail with +-- expecting a single column error +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select * from pg_dist_shard']::text[], + true); + node_name | node_port | success | result +-----------+-----------+---------+------------------------------------------ + localhost | 57637 | f | expected a single column in query target +(1 row) + +-- query result may only contain a single row +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select a from generate_series(1,2) a']::text[], + true); + node_name | node_port | success | result +-----------+-----------+---------+--------------------------------------- + localhost | 57637 | f | expected a single row in query result +(1 row) + + +-- send multiple queries +SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], + ARRAY[:node_port, :node_port]::int[], + ARRAY['select a from generate_series(1,1) a', + 'select a from generate_series(2,2) a']::text[], + true); + node_name | node_port | success | result +-----------+-----------+---------+-------- + localhost | 57637 | t | 1 + localhost | 57637 | t | 2 +(2 rows) + +-- send multiple queries, one fails +SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], + ARRAY[:node_port, :node_port]::int[], + ARRAY['select a from generate_series(1,1) a', + 'select a from generate_series(1,2) a']::text[], + true); + node_name | node_port | success | result +-----------+-----------+---------+--------------------------------------- + localhost | 57637 | t | 1 + localhost | 57637 | f | expected a single row in query result +(2 rows) + +-- send multiple queries, both fail +SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], + ARRAY[:node_port, :node_port]::int[], + ARRAY['select a from generate_series(1,2) a', + 'select a from generate_series(1,2) a']::text[], + true); + node_name | node_port | success | result +-----------+-----------+---------+--------------------------------------- + localhost | 57637 | f | expected a single row in query result + localhost | 57637 | f | expected a single row in query result +(2 rows) + +-- can create tables at worker +SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], + ARRAY[:node_port, :node_port]::int[], + ARRAY['create table first_table(a int, b int)', + 'create table second_table(a int, b int)']::text[], + true); + node_name | node_port | success | result +-----------+-----------+---------+-------------- + localhost | 57637 | t | CREATE TABLE + localhost | 57637 | t | CREATE TABLE +(2 rows) + +-- store worker node name and port again +-- previously set variables become unusable after some number of uses +SELECT quote_literal(node_name) as node_name, node_port as node_port + FROM master_get_active_worker_nodes() + ORDER BY node_port + LIMIT 1 \gset +-- can insert into table +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['insert into first_table select a,a from generate_series(1,20) a']::text[], + true); + node_name | node_port | success | result +-----------+-----------+---------+------------- + localhost | 57637 | t | INSERT 0 20 +(1 row) + +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select count(*) from first_table']::text[], + true); + node_name | node_port | success | result +-----------+-----------+---------+-------- + localhost | 57637 | t | 20 +(1 row) + +-- insert into second table twice +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['insert into second_table select * from first_table']::text[], + true); + node_name | node_port | success | result +-----------+-----------+---------+------------- + localhost | 57637 | t | INSERT 0 20 +(1 row) + +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['insert into second_table select * from first_table']::text[], + true); + node_name | node_port | success | result +-----------+-----------+---------+------------- + localhost | 57637 | t | INSERT 0 20 +(1 row) + +-- check inserted values at second table +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select count(*) from second_table']::text[], + true); + node_name | node_port | success | result +-----------+-----------+---------+-------- + localhost | 57637 | t | 40 +(1 row) + +-- create index on tables +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['create index first_table_index on first_table(a)']::text[], + true); + node_name | node_port | success | result +-----------+-----------+---------+-------------- + localhost | 57637 | t | CREATE INDEX +(1 row) + +-- drop created tables +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['drop table first_table']::text[], + true); + node_name | node_port | success | result +-----------+-----------+---------+------------ + localhost | 57637 | t | DROP TABLE +(1 row) + +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['drop table second_table']::text[], + true); + node_name | node_port | success | result +-----------+-----------+---------+------------ + localhost | 57637 | t | DROP TABLE +(1 row) + +-- verify table is dropped +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select count(*) from second_table']::text[], + true); + node_name | node_port | success | result +-----------+-----------+---------+------------------------------------------------ + localhost | 57637 | f | ERROR: relation "second_table" does not exist +(1 row) + +-- drop the function after use +DROP FUNCTION master_run_on_worker(worker_name text[], port integer[], command text[], + parallel boolean, OUT node_name text, OUT node_port integer, + OUT success boolean, OUT result text); diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index acb53e49a..85375c8a5 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -183,3 +183,8 @@ test: multi_expire_table_cache # ---------- test: multi_colocation_utils test: multi_colocated_shard_transfer + +# ---------- +# multi_citus_tools tests utility functions written for citus tools +# ---------- +test: multi_citus_tools diff --git a/src/test/regress/sql/multi_citus_tools.sql b/src/test/regress/sql/multi_citus_tools.sql new file mode 100644 index 000000000..6e17dd1c0 --- /dev/null +++ b/src/test/regress/sql/multi_citus_tools.sql @@ -0,0 +1,216 @@ +-- +-- MULTI CITUS TOOLS +-- +-- tests UDFs created for citus tools +-- + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1240000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1240000; + +-- the function is not exposed explicitly, create the entry point +CREATE OR REPLACE FUNCTION master_run_on_worker(worker_name text[], port integer[], + command text[], + parallel boolean default false, + OUT node_name text, OUT node_port integer, + OUT success boolean, OUT result text) + RETURNS SETOF record + LANGUAGE C STABLE STRICT + AS 'citus.so', $$master_run_on_worker$$; + +-- test with invalid port, prevent OS dependent warning from being displayed +SET client_min_messages to ERROR; +SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[], + ARRAY['select count(*) from pg_dist_shard']::text[], + false); + +SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[], + ARRAY['select count(*) from pg_dist_shard']::text[], + true); +RESET client_min_messages; + +-- store worker node name and port +SELECT quote_literal(node_name) as node_name, node_port as node_port + FROM master_get_active_worker_nodes() + ORDER BY node_port + LIMIT 1 \gset + +-- connect to the first worker and ask for shard count, should return 0 +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select count(*) from pg_dist_shard']::text[], + false); + +-- connect to the first worker and ask for shards, should fail with +-- expecting a single column error +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select * from pg_dist_shard']::text[], + false); + +-- query result may only contain a single row +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select a from generate_series(1,2) a']::text[], + false); + +-- send multiple queries +SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], + ARRAY[:node_port, :node_port]::int[], + ARRAY['select a from generate_series(1,1) a', + 'select a from generate_series(2,2) a']::text[], + false); + +-- send multiple queries, one fails +SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], + ARRAY[:node_port, :node_port]::int[], + ARRAY['select a from generate_series(1,1) a', + 'select a from generate_series(1,2) a']::text[], + false); + +-- send multiple queries, both fail +SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], + ARRAY[:node_port, :node_port]::int[], + ARRAY['select a from generate_series(1,2) a', + 'select a from generate_series(1,2) a']::text[], + false); +-- can create tables at worker +SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], + ARRAY[:node_port, :node_port]::int[], + ARRAY['create table first_table(a int, b int)', + 'create table second_table(a int, b int)']::text[], + false); + +-- can insert into table +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['insert into first_table select a,a from generate_series(1,20) a']::text[], + false); +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select count(*) from first_table']::text[], + false); +-- insert into second table twice +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['insert into second_table select * from first_table']::text[], + false); +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['insert into second_table select * from first_table']::text[], + false); + +-- check inserted values at second table +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select count(*) from second_table']::text[], + false); +-- store worker node name and port again +-- previously set variables become unusable after some number of uses +SELECT quote_literal(node_name) as node_name, node_port as node_port + FROM master_get_active_worker_nodes() + ORDER BY node_port + LIMIT 1 \gset + +-- create index on tables +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['create index first_table_index on first_table(a)']::text[], + false); +-- drop created tables +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['drop table first_table']::text[], + false); +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['drop table second_table']::text[], + false); + +-- verify table is dropped +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select count(*) from second_table']::text[], + false); +-- +-- Run the same tests in parallel +-- + +-- connect to the first worker and ask for shard count, should return 0 +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select count(*) from pg_dist_shard']::text[], + true); + +-- connect to the first worker and ask for shards, should fail with +-- expecting a single column error +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select * from pg_dist_shard']::text[], + true); + +-- query result may only contain a single row +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select a from generate_series(1,2) a']::text[], + true); + +-- send multiple queries +SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], + ARRAY[:node_port, :node_port]::int[], + ARRAY['select a from generate_series(1,1) a', + 'select a from generate_series(2,2) a']::text[], + true); + +-- send multiple queries, one fails +SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], + ARRAY[:node_port, :node_port]::int[], + ARRAY['select a from generate_series(1,1) a', + 'select a from generate_series(1,2) a']::text[], + true); + +-- send multiple queries, both fail +SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], + ARRAY[:node_port, :node_port]::int[], + ARRAY['select a from generate_series(1,2) a', + 'select a from generate_series(1,2) a']::text[], + true); +-- can create tables at worker +SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], + ARRAY[:node_port, :node_port]::int[], + ARRAY['create table first_table(a int, b int)', + 'create table second_table(a int, b int)']::text[], + true); + +-- store worker node name and port again +-- previously set variables become unusable after some number of uses +SELECT quote_literal(node_name) as node_name, node_port as node_port + FROM master_get_active_worker_nodes() + ORDER BY node_port + LIMIT 1 \gset + +-- can insert into table +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['insert into first_table select a,a from generate_series(1,20) a']::text[], + true); +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select count(*) from first_table']::text[], + true); +-- insert into second table twice +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['insert into second_table select * from first_table']::text[], + true); +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['insert into second_table select * from first_table']::text[], + true); + +-- check inserted values at second table +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select count(*) from second_table']::text[], + true); + +-- create index on tables +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['create index first_table_index on first_table(a)']::text[], + true); +-- drop created tables +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['drop table first_table']::text[], + true); +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['drop table second_table']::text[], + true); +-- verify table is dropped +SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], + ARRAY['select count(*) from second_table']::text[], + true); + +-- drop the function after use +DROP FUNCTION master_run_on_worker(worker_name text[], port integer[], command text[], + parallel boolean, OUT node_name text, OUT node_port integer, + OUT success boolean, OUT result text); +