Add master_run_on_worker UDF

pull/843/head
Murat Tuncer 2016-10-14 11:30:40 +03:00
parent 0bce20dd74
commit b453f6c7ab
5 changed files with 1442 additions and 0 deletions

View File

@ -0,0 +1,285 @@
/*
* citus_tools.sql
* Contains definitions of citus_tools UDFs
* - citus_run_on_all_workers
* - citus_run_on_all_placements
* - citus_run_on_all_colocated_placements
* - citus_run_on_all_shards
*
* These functions depends on presence of UDF master_run_on_worker
*/
CREATE OR REPLACE FUNCTION master_run_on_worker(worker_name text[], port integer[],
command text[], parallel boolean,
OUT node_name text, OUT node_port integer,
OUT success boolean, OUT result text )
RETURNS SETOF record
LANGUAGE C STABLE STRICT
AS 'citus.so', $$master_run_on_worker$$;
CREATE TYPE colocation_placement_type AS (
shardid1 bigint,
shardid2 bigint,
nodename text,
nodeport bigint
);
--
-- tables_colocated returns true if given tables are co-located, false otherwise.
-- The function checks shard definitions, matches shard placements for given tables.
--
CREATE OR REPLACE FUNCTION citus_tables_colocated(table1 regclass, table2 regclass)
RETURNS bool
LANGUAGE plpgsql
AS $function$
DECLARE
colocated_shard_count int;
table1_shard_count int;
table2_shard_count int;
table1_placement_count int;
table2_placement_count int;
table1_placements colocation_placement_type[];
table2_placements colocation_placement_type[];
BEGIN
SELECT count(*),
(SELECT count(*) FROM pg_dist_shard a WHERE a.logicalrelid = table1),
(SELECT count(*) FROM pg_dist_shard b WHERE b.logicalrelid = table2)
INTO colocated_shard_count, table1_shard_count, table2_shard_count
FROM pg_dist_shard tba JOIN pg_dist_shard tbb USING(shardminvalue, shardmaxvalue)
WHERE tba.logicalrelid = table1 AND tbb.logicalrelid = table2;
IF (table1_shard_count != table2_shard_count OR
table1_shard_count != colocated_shard_count)
THEN
RETURN false;
END IF;
WITH colocated_shards AS (
SELECT tba.shardid as shardid1, tbb.shardid as shardid2
FROM pg_dist_shard tba JOIN pg_dist_shard tbb USING(shardminvalue, shardmaxvalue)
WHERE tba.logicalrelid = table1 AND tbb.logicalrelid = table2),
left_shard_placements AS (
SELECT cs.shardid1, cs.shardid2, sp.nodename, sp.nodeport
FROM colocated_shards cs JOIN pg_dist_shard_placement sp ON (cs.shardid1 = sp.shardid)
WHERE sp.shardstate = 1)
SELECT
array_agg((lsp.shardid1, lsp.shardid2, lsp.nodename, lsp.nodeport)::colocation_placement_type
ORDER BY shardid1, shardid2, nodename, nodeport),
count(distinct lsp.shardid1)
FROM left_shard_placements lsp
INTO table1_placements, table1_placement_count;
WITH colocated_shards AS (
SELECT tba.shardid as shardid1, tbb.shardid as shardid2
FROM pg_dist_shard tba JOIN pg_dist_shard tbb USING(shardminvalue, shardmaxvalue)
WHERE tba.logicalrelid = table1 AND tbb.logicalrelid = table2),
right_shard_placements AS (
SELECT cs.shardid1, cs.shardid2, sp.nodename, sp.nodeport
FROM colocated_shards cs LEFT JOIN pg_dist_shard_placement sp ON(cs.shardid2 = sp.shardid)
WHERE sp.shardstate = 1)
SELECT
array_agg((rsp.shardid1, rsp.shardid2, rsp.nodename, rsp.nodeport)::colocation_placement_type
ORDER BY shardid1, shardid2, nodename, nodeport),
count(distinct rsp.shardid2)
FROM right_shard_placements rsp
INTO table2_placements, table2_placement_count;
IF (table1_shard_count != table1_placement_count
OR table1_placement_count != table2_placement_count) THEN
RETURN false;
END IF;
IF (array_length(table1_placements, 1) != array_length(table2_placements, 1)) THEN
RETURN false;
END IF;
FOR i IN 1..array_length(table1_placements,1) LOOP
IF (table1_placements[i].nodename != table2_placements[i].nodename OR
table1_placements[i].nodeport != table2_placements[i].nodeport) THEN
RETURN false;
END IF;
END LOOP;
RETURN true;
END;
$function$;
CREATE OR REPLACE FUNCTION citus_run_on_all_workers(command text,
parallel bool default true,
OUT nodename text,
OUT nodeport int,
OUT success bool,
OUT result text)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
DECLARE
workers text[];
ports int[];
commands text[];
BEGIN
WITH citus_workers AS (
SELECT * FROM master_get_active_worker_nodes() ORDER BY node_name, node_port)
SELECT array_agg(node_name), array_agg(node_port), array_agg(command)
INTO workers, ports, commands
FROM citus_workers;
RETURN QUERY SELECT * FROM master_run_on_worker(workers, ports, commands, parallel);
END;
$function$;
CREATE OR REPLACE FUNCTION citus_run_on_all_placements(table_name regclass, command text,
parallel bool default true,
OUT nodename text,
OUT nodeport int,
OUT shardid bigint,
OUT success bool, OUT result text)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
DECLARE
workers text[];
ports int[];
shards bigint[];
commands text[];
BEGIN
WITH citus_placements AS (
SELECT
ds.logicalrelid::regclass AS tablename,
ds.shardid AS shardid,
shard_name(ds.logicalrelid, ds.shardid) AS shardname,
dsp.nodename AS nodename, dsp.nodeport::int AS nodeport
FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid)
WHERE dsp.shardstate = 1 and ds.logicalrelid::regclass = table_name
ORDER BY ds.logicalrelid, ds.shardid, dsp.nodename, dsp.nodeport)
SELECT
array_agg(cp.nodename), array_agg(cp.nodeport), array_agg(cp.shardid),
array_agg(format(command, cp.shardname))
INTO workers, ports, shards, commands
FROM citus_placements cp;
RETURN QUERY
SELECT r.node_name, r.node_port, shards[ordinality],
r.success, r.result
FROM master_run_on_worker(workers, ports, commands, parallel) WITH ORDINALITY r;
END;
$function$;
CREATE OR REPLACE FUNCTION citus_run_on_all_colocated_placements(table_name1 regclass,
table_name2 regclass,
command text,
parallel bool default true,
OUT nodename text,
OUT nodeport int,
OUT shardid1 bigint,
OUT shardid2 bigint,
OUT success bool,
OUT result text)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
DECLARE
workers text[];
ports int[];
shards1 bigint[];
shards2 bigint[];
commands text[];
BEGIN
IF NOT (SELECT citus_tables_colocated(table_name1, table_name2)) THEN
RAISE EXCEPTION 'tables % and % are not co-located', table_name1, table_name2;
END IF;
WITH active_shard_placements AS (
SELECT
ds.logicalrelid,
ds.shardid AS shardid,
shard_name(ds.logicalrelid, ds.shardid) AS shardname,
ds.shardminvalue AS shardminvalue,
ds.shardmaxvalue AS shardmaxvalue,
dsp.nodename AS nodename,
dsp.nodeport::int AS nodeport
FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid)
WHERE dsp.shardstate = 1 and (ds.logicalrelid::regclass = table_name1 or
ds.logicalrelid::regclass = table_name2)
ORDER BY ds.logicalrelid, ds.shardid, dsp.nodename, dsp.nodeport),
citus_colocated_placements AS (
SELECT
a.logicalrelid::regclass AS tablename1,
a.shardid AS shardid1,
shard_name(a.logicalrelid, a.shardid) AS shardname1,
b.logicalrelid::regclass AS tablename2,
b.shardid AS shardid2,
shard_name(b.logicalrelid, b.shardid) AS shardname2,
a.nodename AS nodename,
a.nodeport::int AS nodeport
FROM
active_shard_placements a, active_shard_placements b
WHERE
a.shardminvalue = b.shardminvalue AND
a.shardmaxvalue = b.shardmaxvalue AND
a.logicalrelid != b.logicalrelid AND
a.nodename = b.nodename AND
a.nodeport = b.nodeport AND
a.logicalrelid::regclass = table_name1 AND
b.logicalrelid::regclass = table_name2
ORDER BY a.logicalrelid, a.shardid, nodename, nodeport)
SELECT
array_agg(cp.nodename), array_agg(cp.nodeport), array_agg(cp.shardid1),
array_agg(cp.shardid2), array_agg(format(command, cp.shardname1, cp.shardname2))
INTO workers, ports, shards1, shards2, commands
FROM citus_colocated_placements cp;
RETURN QUERY SELECT r.node_name, r.node_port, shards1[ordinality],
shards2[ordinality], r.success, r.result
FROM master_run_on_worker(workers, ports, commands, parallel) WITH ORDINALITY r;
END;
$function$;
CREATE OR REPLACE FUNCTION citus_run_on_all_shards(table_name regclass, command text,
parallel bool default true,
OUT shardid bigint,
OUT success bool,
OUT result text)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
DECLARE
workers text[];
ports int[];
shards bigint[];
commands text[];
shard_count int;
BEGIN
SELECT COUNT(*) INTO shard_count FROM pg_dist_shard
WHERE logicalrelid = table_name;
WITH citus_shards AS (
SELECT ds.logicalrelid::regclass AS tablename,
ds.shardid AS shardid,
shard_name(ds.logicalrelid, ds.shardid) AS shardname,
array_agg(dsp.nodename) AS nodenames,
array_agg(dsp.nodeport) AS nodeports
FROM pg_dist_shard ds LEFT JOIN pg_dist_shard_placement dsp USING (shardid)
WHERE dsp.shardstate = 1 and ds.logicalrelid::regclass = table_name
GROUP BY ds.logicalrelid, ds.shardid
ORDER BY ds.logicalrelid, ds.shardid)
SELECT
array_agg(cs.nodenames[1]), array_agg(cs.nodeports[1]), array_agg(cs.shardid),
array_agg(format(command, cs.shardname))
INTO workers, ports, shards, commands
FROM citus_shards cs;
IF (shard_count != array_length(workers, 1)) THEN
RAISE NOTICE 'some shards do not have active placements';
END IF;
RETURN QUERY
SELECT shards[ordinality], r.success, r.result
FROM master_run_on_worker(workers, ports, commands, parallel) WITH ORDINALITY r;
END;
$function$;

View File

@ -0,0 +1,566 @@
/*-------------------------------------------------------------------------
*
* master_citus_tools.c
* UDF to run multi shard/worker queries
*
* This file contains functions to run commands on other worker/shards.
*
* Copyright (c) 2016-2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "access/htup_details.h"
#include "catalog/pg_type.h"
#include "distributed/connection_cache.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_server_executor.h"
#include "distributed/worker_protocol.h"
#include "lib/stringinfo.h"
#include "utils/builtins.h"
#include "distributed/multi_client_executor.h"
PG_FUNCTION_INFO_V1(master_run_on_worker);
static int ParseCommandParameters(FunctionCallInfo fcinfo, StringInfo **nodeNameArray,
int **nodePortsArray, StringInfo **commandStringArray,
bool *parallel);
static void ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray,
int *nodePortArray,
StringInfo *commandStringArray,
bool *statusArray,
StringInfo *resultStringArray,
int commmandCount);
static bool GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus,
StringInfo queryResultString);
static bool EvaluateQueryResult(PGconn *connection, PGresult *queryResult, StringInfo
queryResultString);
static void StoreErrorMessage(PGconn *connection, StringInfo queryResultString);
static void ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray,
int *nodePortArray,
StringInfo *commandStringArray,
bool *statusArray,
StringInfo *resultStringArray,
int commmandCount);
static bool ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort,
char *queryString, StringInfo queryResult);
static Tuplestorestate * CreateTupleStore(TupleDesc tupleDescriptor,
StringInfo *nodeNameArray, int *nodePortArray,
bool *statusArray,
StringInfo *resultArray, int commandCount);
/*
* master_run_on_worker executes queries/commands to run on specified worker and
* returns success status and query/command result. Expected input is 3 arrays
* containing node names, node ports, and query strings, and boolean flag to specify
* parallel execution. The function then returns node_name, node_port, success,
* result tuples upon completion of the query. The same user credentials are used
* to connect to remote nodes.
*/
Datum
master_run_on_worker(PG_FUNCTION_ARGS)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
MemoryContext per_query_ctx = NULL;
MemoryContext oldcontext = NULL;
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = NULL;
bool parallelExecution = false;
StringInfo *nodeNameArray = NULL;
int *nodePortArray = NULL;
StringInfo *commandStringArray = NULL;
bool *statusArray = NULL;
StringInfo *resultArray = NULL;
int commandIndex = 0;
int commandCount = 0;
/* check to see if caller supports us returning a tuplestore */
if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize))
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("materialize mode required, but it is not "
"allowed in this context")));
}
commandCount = ParseCommandParameters(fcinfo, &nodeNameArray, &nodePortArray,
&commandStringArray, &parallelExecution);
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
/* get the requested return tuple description */
tupleDescriptor = CreateTupleDescCopy(rsinfo->expectedDesc);
/*
* Check to make sure we have correct tuple descriptor
*/
if (tupleDescriptor->natts != 4 ||
tupleDescriptor->attrs[0]->atttypid != TEXTOID ||
tupleDescriptor->attrs[1]->atttypid != INT4OID ||
tupleDescriptor->attrs[2]->atttypid != BOOLOID ||
tupleDescriptor->attrs[3]->atttypid != TEXTOID)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_DEFINITION),
errmsg("query-specified return tuple and "
"function return type are not compatible")));
}
/* prepare storage for status and result values */
statusArray = palloc0(commandCount * sizeof(bool));
resultArray = palloc0(commandCount * sizeof(StringInfo));
for (commandIndex = 0; commandIndex < commandCount; commandIndex++)
{
resultArray[commandIndex] = makeStringInfo();
}
if (parallelExecution)
{
ExecuteCommandsInParallelAndStoreResults(nodeNameArray, nodePortArray,
commandStringArray,
statusArray, resultArray, commandCount);
}
else
{
ExecuteCommandsAndStoreResults(nodeNameArray, nodePortArray, commandStringArray,
statusArray, resultArray, commandCount);
}
/* let the caller know we're sending back a tuplestore */
rsinfo->returnMode = SFRM_Materialize;
tupleStore = CreateTupleStore(tupleDescriptor,
nodeNameArray, nodePortArray, statusArray,
resultArray, commandCount);
rsinfo->setResult = tupleStore;
rsinfo->setDesc = tupleDescriptor;
MemoryContextSwitchTo(oldcontext);
PG_RETURN_VOID();
}
/* ParseCommandParameters reads call parameters and fills in data structures */
static int
ParseCommandParameters(FunctionCallInfo fcinfo, StringInfo **nodeNameArray,
int **nodePortsArray, StringInfo **commandStringArray,
bool *parallel)
{
ArrayType *nodeNameArrayObject = PG_GETARG_ARRAYTYPE_P(0);
ArrayType *nodePortArrayObject = PG_GETARG_ARRAYTYPE_P(1);
ArrayType *commandStringArrayObject = PG_GETARG_ARRAYTYPE_P(2);
bool parallelExecution = PG_GETARG_BOOL(3);
int nodeNameCount = ArrayObjectCount(nodeNameArrayObject);
int nodePortCount = ArrayObjectCount(nodePortArrayObject);
int commandStringCount = ArrayObjectCount(commandStringArrayObject);
Datum *nodeNameDatumArray = DeconstructArrayObject(nodeNameArrayObject);
Datum *nodePortDatumArray = DeconstructArrayObject(nodePortArrayObject);
Datum *commandStringDatumArray = DeconstructArrayObject(commandStringArrayObject);
int index = 0;
StringInfo *nodeNames = NULL;
int *nodePorts = NULL;
StringInfo *commandStrings = NULL;
if (nodeNameCount != nodePortCount || nodeNameCount != commandStringCount)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("expected same number of node name, port, and query string")));
}
nodeNames = palloc0(nodeNameCount * sizeof(StringInfo));
nodePorts = palloc0(nodeNameCount * sizeof(int));
commandStrings = palloc0(nodeNameCount * sizeof(StringInfo));
for (index = 0; index < nodeNameCount; index++)
{
text *nodeNameText = DatumGetTextP(nodeNameDatumArray[index]);
char *nodeName = text_to_cstring(nodeNameText);
int32 nodePort = DatumGetInt32(nodePortDatumArray[index]);
text *commandText = DatumGetTextP(commandStringDatumArray[index]);
char *commandString = text_to_cstring(commandText);
nodeNames[index] = makeStringInfo();
commandStrings[index] = makeStringInfo();
appendStringInfo(nodeNames[index], "%s", nodeName);
nodePorts[index] = nodePort;
appendStringInfo(commandStrings[index], "%s", commandString);
}
*nodeNameArray = nodeNames;
*nodePortsArray = nodePorts;
*commandStringArray = commandStrings;
*parallel = parallelExecution;
return nodeNameCount;
}
/*
* ExecuteCommandsInParellelAndStoreResults connects to each node specified in
* nodeNameArray and nodePortArray, and executes command in commandStringArray
* in parallel fashion. Execution success status and result is reported for
* each command in statusArray and resultStringArray. Each array contains
* commandCount items.
*/
static void
ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePortArray,
StringInfo *commandStringArray,
bool *statusArray, StringInfo *resultStringArray,
int commmandCount)
{
int commandIndex = 0;
char *nodeUser = CurrentUserName();
PGconn **connectionArray = palloc0(commmandCount * sizeof(PGconn *));
int finishedCount = 0;
/* establish connections */
for (commandIndex = 0; commandIndex < commmandCount; commandIndex++)
{
char *nodeName = nodeNameArray[commandIndex]->data;
int nodePort = nodePortArray[commandIndex];
PGconn *connection = ConnectToNode(nodeName, nodePort, nodeUser);
StringInfo queryResultString = resultStringArray[commandIndex];
statusArray[commandIndex] = true;
connectionArray[commandIndex] = connection;
if (connection == NULL)
{
appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName,
(int) nodePort);
statusArray[commandIndex] = false;
finishedCount++;
}
}
/* send queries at once */
for (commandIndex = 0; commandIndex < commmandCount; commandIndex++)
{
int querySent = 0;
PGconn *connection = connectionArray[commandIndex];
char *queryString = commandStringArray[commandIndex]->data;
StringInfo queryResultString = resultStringArray[commandIndex];
/*
* If we don't have a connection, nothing to send, error string should already
* been filled.
*/
if (connection == NULL)
{
continue;
}
querySent = PQsendQuery(connection, queryString);
if (querySent == 0)
{
StoreErrorMessage(connection, queryResultString);
statusArray[commandIndex] = false;
PQfinish(connection);
connectionArray[commandIndex] = NULL;
finishedCount++;
}
}
/* check for query results */
while (finishedCount < commmandCount)
{
for (commandIndex = 0; commandIndex < commmandCount; commandIndex++)
{
PGconn *connection = connectionArray[commandIndex];
StringInfo queryResultString = resultStringArray[commandIndex];
bool success = false;
bool queryFinished = false;
if (connection == NULL)
{
continue;
}
queryFinished = GetConnectionStatusAndResult(connection, &success,
queryResultString);
if (queryFinished)
{
finishedCount++;
statusArray[commandIndex] = success;
connectionArray[commandIndex] = NULL;
PQfinish(connection);
}
}
CHECK_FOR_INTERRUPTS();
if (finishedCount < commmandCount)
{
long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L;
pg_usleep(sleepIntervalPerCycle);
}
}
pfree(connectionArray);
}
/*
* GetConnectionStatusAndResult checks the active connection and returns true if
* query execution is finished (either success or fail).
* Query success/fail in resultStatus, and query result in queryResultString are
* reported upon completion of the query.
*/
static bool
GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus,
StringInfo queryResultString)
{
bool finished = true;
ConnStatusType connectionStatus = PQstatus(connection);
int consumeInput = 0;
PGresult *queryResult = NULL;
bool success = false;
*resultStatus = false;
resetStringInfo(queryResultString);
if (connectionStatus == CONNECTION_BAD)
{
appendStringInfo(queryResultString, "connection lost");
return finished;
}
consumeInput = PQconsumeInput(connection);
if (consumeInput == 0)
{
appendStringInfo(queryResultString, "query result unavailable");
return finished;
}
/* check later if busy */
if (PQisBusy(connection) != 0)
{
finished = false;
return finished;
}
/* query result is available at this point */
queryResult = PQgetResult(connection);
success = EvaluateQueryResult(connection, queryResult, queryResultString);
PQclear(queryResult);
*resultStatus = success;
finished = true;
return true;
}
/*
* EvaluateQueryResult gets the query result from connection and returns
* true if the query is executed successfully, false otherwise. A query result
* or an error message is returned in queryResultString. The function requires
* that the query returns a single column/single row result. It returns an
* error otherwise.
*/
static bool
EvaluateQueryResult(PGconn *connection, PGresult *queryResult,
StringInfo queryResultString)
{
bool success = false;
ExecStatusType resultStatus = PQresultStatus(queryResult);
if (resultStatus == PGRES_COMMAND_OK)
{
char *commandStatus = PQcmdStatus(queryResult);
appendStringInfo(queryResultString, "%s", commandStatus);
success = true;
}
else if (resultStatus == PGRES_TUPLES_OK)
{
int ntuples = PQntuples(queryResult);
int nfields = PQnfields(queryResult);
/* error if query returns more than 1 rows, or more than 1 fields */
if (nfields != 1)
{
appendStringInfo(queryResultString,
"expected a single column in query target");
}
else if (ntuples > 1)
{
appendStringInfo(queryResultString,
"expected a single row in query result");
}
else
{
int row = 0;
int column = 0;
if (!PQgetisnull(queryResult, row, column))
{
char *queryResultValue = PQgetvalue(queryResult, row, column);
appendStringInfo(queryResultString, "%s", queryResultValue);
}
success = true;
}
}
else
{
StoreErrorMessage(connection, queryResultString);
}
return success;
}
/*
* StoreErrorMessage gets the error message from connection and stores it
* in queryResultString. It should be called only when error is present
* otherwise it would return a default error message.
*/
static void
StoreErrorMessage(PGconn *connection, StringInfo queryResultString)
{
char *errorMessage = PQerrorMessage(connection);
char *firstNewlineIndex = strchr(errorMessage, '\n');
/* trim the error message at the line break */
if (firstNewlineIndex != NULL)
{
*firstNewlineIndex = '\0';
}
/* put a default error message if no error message is reported */
if (errorMessage == NULL)
{
errorMessage = "An error occurred while running the query";
}
appendStringInfo(queryResultString, "%s", errorMessage);
}
/*
* ExecuteCommandsAndStoreResults connects to each node specified in
* nodeNameArray and nodePortArray, and executes command in commandStringArray
* in sequential order. Execution success status and result is reported for
* each command in statusArray and resultStringArray. Each array contains
* commandCount items.
*/
static void
ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray, int *nodePortArray,
StringInfo *commandStringArray, bool *statusArray,
StringInfo *resultStringArray, int commmandCount)
{
int commandIndex = 0;
for (commandIndex = 0; commandIndex < commmandCount; commandIndex++)
{
char *nodeName = nodeNameArray[commandIndex]->data;
int32 nodePort = nodePortArray[commandIndex];
bool success = false;
char *queryString = commandStringArray[commandIndex]->data;
StringInfo queryResultString = resultStringArray[commandIndex];
success = ExecuteRemoteQueryOrCommand(nodeName, nodePort, queryString,
queryResultString);
statusArray[commandIndex] = success;
CHECK_FOR_INTERRUPTS();
}
}
/*
* ExecuteRemoteQueryOrCommand executes a query at specified remote node using
* the calling user's credentials. The function returns the query status
* (success/failure), and query result. The query is expected to return a single
* target containing zero or one rows.
*/
static bool
ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString,
StringInfo queryResultString)
{
char *nodeUser = CurrentUserName();
PGconn *nodeConnection = ConnectToNode(nodeName, nodePort, nodeUser);
bool success = false;
if (nodeConnection == NULL)
{
appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName,
(int) nodePort);
return false;
}
PG_TRY();
{
PGresult *queryResult = PQexec(nodeConnection, queryString);
success = EvaluateQueryResult(nodeConnection, queryResult, queryResultString);
PQclear(queryResult);
/* close the connection */
PQfinish(nodeConnection);
nodeConnection = NULL;
}
PG_CATCH();
{
StoreErrorMessage(nodeConnection, queryResultString);
/* close the connection */
PQfinish(nodeConnection);
nodeConnection = NULL;
}
PG_END_TRY();
return success;
}
/* CreateTupleStore prepares result tuples from individual query results */
static Tuplestorestate *
CreateTupleStore(TupleDesc tupleDescriptor,
StringInfo *nodeNameArray, int *nodePortArray, bool *statusArray,
StringInfo *resultArray, int commandCount)
{
Tuplestorestate *tupleStore = tuplestore_begin_heap(true, false, work_mem);
int commandIndex = 0;
bool nulls[4] = { false, false, false, false };
for (commandIndex = 0; commandIndex < commandCount; commandIndex++)
{
Datum values[4];
HeapTuple tuple = NULL;
StringInfo nodeNameString = nodeNameArray[commandIndex];
StringInfo resultString = resultArray[commandIndex];
text *nodeNameText = cstring_to_text_with_len(nodeNameString->data,
nodeNameString->len);
text *resultText = cstring_to_text_with_len(resultString->data,
resultString->len);
values[0] = PointerGetDatum(nodeNameText);
values[1] = Int32GetDatum(nodePortArray[commandIndex]);
values[2] = BoolGetDatum(statusArray[commandIndex]);
values[3] = PointerGetDatum(resultText);
tuple = heap_form_tuple(tupleDescriptor, values, nulls);
tuplestore_puttuple(tupleStore, tuple);
heap_freetuple(tuple);
pfree(nodeNameText);
pfree(resultText);
}
tuplestore_donestoring(tupleStore);
return tupleStore;
}

View File

@ -0,0 +1,370 @@
--
-- MULTI CITUS TOOLS
--
-- tests UDFs created for citus tools
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1240000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1240000;
-- the function is not exposed explicitly, create the entry point
CREATE OR REPLACE FUNCTION master_run_on_worker(worker_name text[], port integer[],
command text[],
parallel boolean default false,
OUT node_name text, OUT node_port integer,
OUT success boolean, OUT result text)
RETURNS SETOF record
LANGUAGE C STABLE STRICT
AS 'citus.so', $$master_run_on_worker$$;
-- test with invalid port, prevent OS dependent warning from being displayed
SET client_min_messages to ERROR;
SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[],
ARRAY['select count(*) from pg_dist_shard']::text[],
false);
node_name | node_port | success | result
-----------+-----------+---------+------------------------------------
localhost | 666 | f | failed to connect to localhost:666
(1 row)
SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[],
ARRAY['select count(*) from pg_dist_shard']::text[],
true);
node_name | node_port | success | result
-----------+-----------+---------+------------------------------------
localhost | 666 | f | failed to connect to localhost:666
(1 row)
RESET client_min_messages;
-- store worker node name and port
SELECT quote_literal(node_name) as node_name, node_port as node_port
FROM master_get_active_worker_nodes()
ORDER BY node_port
LIMIT 1 \gset
-- connect to the first worker and ask for shard count, should return 0
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select count(*) from pg_dist_shard']::text[],
false);
node_name | node_port | success | result
-----------+-----------+---------+--------
localhost | 57637 | t | 0
(1 row)
-- connect to the first worker and ask for shards, should fail with
-- expecting a single column error
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select * from pg_dist_shard']::text[],
false);
node_name | node_port | success | result
-----------+-----------+---------+------------------------------------------
localhost | 57637 | f | expected a single column in query target
(1 row)
-- query result may only contain a single row
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select a from generate_series(1,2) a']::text[],
false);
node_name | node_port | success | result
-----------+-----------+---------+---------------------------------------
localhost | 57637 | f | expected a single row in query result
(1 row)
-- send multiple queries
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
ARRAY[:node_port, :node_port]::int[],
ARRAY['select a from generate_series(1,1) a',
'select a from generate_series(2,2) a']::text[],
false);
node_name | node_port | success | result
-----------+-----------+---------+--------
localhost | 57637 | t | 1
localhost | 57637 | t | 2
(2 rows)
-- send multiple queries, one fails
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
ARRAY[:node_port, :node_port]::int[],
ARRAY['select a from generate_series(1,1) a',
'select a from generate_series(1,2) a']::text[],
false);
node_name | node_port | success | result
-----------+-----------+---------+---------------------------------------
localhost | 57637 | t | 1
localhost | 57637 | f | expected a single row in query result
(2 rows)
-- send multiple queries, both fail
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
ARRAY[:node_port, :node_port]::int[],
ARRAY['select a from generate_series(1,2) a',
'select a from generate_series(1,2) a']::text[],
false);
node_name | node_port | success | result
-----------+-----------+---------+---------------------------------------
localhost | 57637 | f | expected a single row in query result
localhost | 57637 | f | expected a single row in query result
(2 rows)
-- can create tables at worker
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
ARRAY[:node_port, :node_port]::int[],
ARRAY['create table first_table(a int, b int)',
'create table second_table(a int, b int)']::text[],
false);
node_name | node_port | success | result
-----------+-----------+---------+--------------
localhost | 57637 | t | CREATE TABLE
localhost | 57637 | t | CREATE TABLE
(2 rows)
-- can insert into table
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['insert into first_table select a,a from generate_series(1,20) a']::text[],
false);
node_name | node_port | success | result
-----------+-----------+---------+-------------
localhost | 57637 | t | INSERT 0 20
(1 row)
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select count(*) from first_table']::text[],
false);
node_name | node_port | success | result
-----------+-----------+---------+--------
localhost | 57637 | t | 20
(1 row)
-- insert into second table twice
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['insert into second_table select * from first_table']::text[],
false);
node_name | node_port | success | result
-----------+-----------+---------+-------------
localhost | 57637 | t | INSERT 0 20
(1 row)
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['insert into second_table select * from first_table']::text[],
false);
node_name | node_port | success | result
-----------+-----------+---------+-------------
localhost | 57637 | t | INSERT 0 20
(1 row)
-- check inserted values at second table
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select count(*) from second_table']::text[],
false);
node_name | node_port | success | result
-----------+-----------+---------+--------
localhost | 57637 | t | 40
(1 row)
-- store worker node name and port again
-- previously set variables become unusable after some number of uses
SELECT quote_literal(node_name) as node_name, node_port as node_port
FROM master_get_active_worker_nodes()
ORDER BY node_port
LIMIT 1 \gset
-- create index on tables
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['create index first_table_index on first_table(a)']::text[],
false);
node_name | node_port | success | result
-----------+-----------+---------+--------------
localhost | 57637 | t | CREATE INDEX
(1 row)
-- drop created tables
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['drop table first_table']::text[],
false);
node_name | node_port | success | result
-----------+-----------+---------+------------
localhost | 57637 | t | DROP TABLE
(1 row)
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['drop table second_table']::text[],
false);
node_name | node_port | success | result
-----------+-----------+---------+------------
localhost | 57637 | t | DROP TABLE
(1 row)
-- verify table is dropped
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select count(*) from second_table']::text[],
false);
node_name | node_port | success | result
-----------+-----------+---------+------------------------------------------------
localhost | 57637 | f | ERROR: relation "second_table" does not exist
(1 row)
--
-- Run the same tests in parallel
--
-- connect to the first worker and ask for shard count, should return 0
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select count(*) from pg_dist_shard']::text[],
true);
node_name | node_port | success | result
-----------+-----------+---------+--------
localhost | 57637 | t | 0
(1 row)
-- connect to the first worker and ask for shards, should fail with
-- expecting a single column error
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select * from pg_dist_shard']::text[],
true);
node_name | node_port | success | result
-----------+-----------+---------+------------------------------------------
localhost | 57637 | f | expected a single column in query target
(1 row)
-- query result may only contain a single row
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select a from generate_series(1,2) a']::text[],
true);
node_name | node_port | success | result
-----------+-----------+---------+---------------------------------------
localhost | 57637 | f | expected a single row in query result
(1 row)
-- send multiple queries
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
ARRAY[:node_port, :node_port]::int[],
ARRAY['select a from generate_series(1,1) a',
'select a from generate_series(2,2) a']::text[],
true);
node_name | node_port | success | result
-----------+-----------+---------+--------
localhost | 57637 | t | 1
localhost | 57637 | t | 2
(2 rows)
-- send multiple queries, one fails
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
ARRAY[:node_port, :node_port]::int[],
ARRAY['select a from generate_series(1,1) a',
'select a from generate_series(1,2) a']::text[],
true);
node_name | node_port | success | result
-----------+-----------+---------+---------------------------------------
localhost | 57637 | t | 1
localhost | 57637 | f | expected a single row in query result
(2 rows)
-- send multiple queries, both fail
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
ARRAY[:node_port, :node_port]::int[],
ARRAY['select a from generate_series(1,2) a',
'select a from generate_series(1,2) a']::text[],
true);
node_name | node_port | success | result
-----------+-----------+---------+---------------------------------------
localhost | 57637 | f | expected a single row in query result
localhost | 57637 | f | expected a single row in query result
(2 rows)
-- can create tables at worker
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
ARRAY[:node_port, :node_port]::int[],
ARRAY['create table first_table(a int, b int)',
'create table second_table(a int, b int)']::text[],
true);
node_name | node_port | success | result
-----------+-----------+---------+--------------
localhost | 57637 | t | CREATE TABLE
localhost | 57637 | t | CREATE TABLE
(2 rows)
-- store worker node name and port again
-- previously set variables become unusable after some number of uses
SELECT quote_literal(node_name) as node_name, node_port as node_port
FROM master_get_active_worker_nodes()
ORDER BY node_port
LIMIT 1 \gset
-- can insert into table
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['insert into first_table select a,a from generate_series(1,20) a']::text[],
true);
node_name | node_port | success | result
-----------+-----------+---------+-------------
localhost | 57637 | t | INSERT 0 20
(1 row)
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select count(*) from first_table']::text[],
true);
node_name | node_port | success | result
-----------+-----------+---------+--------
localhost | 57637 | t | 20
(1 row)
-- insert into second table twice
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['insert into second_table select * from first_table']::text[],
true);
node_name | node_port | success | result
-----------+-----------+---------+-------------
localhost | 57637 | t | INSERT 0 20
(1 row)
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['insert into second_table select * from first_table']::text[],
true);
node_name | node_port | success | result
-----------+-----------+---------+-------------
localhost | 57637 | t | INSERT 0 20
(1 row)
-- check inserted values at second table
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select count(*) from second_table']::text[],
true);
node_name | node_port | success | result
-----------+-----------+---------+--------
localhost | 57637 | t | 40
(1 row)
-- create index on tables
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['create index first_table_index on first_table(a)']::text[],
true);
node_name | node_port | success | result
-----------+-----------+---------+--------------
localhost | 57637 | t | CREATE INDEX
(1 row)
-- drop created tables
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['drop table first_table']::text[],
true);
node_name | node_port | success | result
-----------+-----------+---------+------------
localhost | 57637 | t | DROP TABLE
(1 row)
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['drop table second_table']::text[],
true);
node_name | node_port | success | result
-----------+-----------+---------+------------
localhost | 57637 | t | DROP TABLE
(1 row)
-- verify table is dropped
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select count(*) from second_table']::text[],
true);
node_name | node_port | success | result
-----------+-----------+---------+------------------------------------------------
localhost | 57637 | f | ERROR: relation "second_table" does not exist
(1 row)
-- drop the function after use
DROP FUNCTION master_run_on_worker(worker_name text[], port integer[], command text[],
parallel boolean, OUT node_name text, OUT node_port integer,
OUT success boolean, OUT result text);

View File

@ -183,3 +183,8 @@ test: multi_expire_table_cache
# ----------
test: multi_colocation_utils
test: multi_colocated_shard_transfer
# ----------
# multi_citus_tools tests utility functions written for citus tools
# ----------
test: multi_citus_tools

View File

@ -0,0 +1,216 @@
--
-- MULTI CITUS TOOLS
--
-- tests UDFs created for citus tools
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1240000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1240000;
-- the function is not exposed explicitly, create the entry point
CREATE OR REPLACE FUNCTION master_run_on_worker(worker_name text[], port integer[],
command text[],
parallel boolean default false,
OUT node_name text, OUT node_port integer,
OUT success boolean, OUT result text)
RETURNS SETOF record
LANGUAGE C STABLE STRICT
AS 'citus.so', $$master_run_on_worker$$;
-- test with invalid port, prevent OS dependent warning from being displayed
SET client_min_messages to ERROR;
SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[],
ARRAY['select count(*) from pg_dist_shard']::text[],
false);
SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[],
ARRAY['select count(*) from pg_dist_shard']::text[],
true);
RESET client_min_messages;
-- store worker node name and port
SELECT quote_literal(node_name) as node_name, node_port as node_port
FROM master_get_active_worker_nodes()
ORDER BY node_port
LIMIT 1 \gset
-- connect to the first worker and ask for shard count, should return 0
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select count(*) from pg_dist_shard']::text[],
false);
-- connect to the first worker and ask for shards, should fail with
-- expecting a single column error
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select * from pg_dist_shard']::text[],
false);
-- query result may only contain a single row
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select a from generate_series(1,2) a']::text[],
false);
-- send multiple queries
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
ARRAY[:node_port, :node_port]::int[],
ARRAY['select a from generate_series(1,1) a',
'select a from generate_series(2,2) a']::text[],
false);
-- send multiple queries, one fails
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
ARRAY[:node_port, :node_port]::int[],
ARRAY['select a from generate_series(1,1) a',
'select a from generate_series(1,2) a']::text[],
false);
-- send multiple queries, both fail
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
ARRAY[:node_port, :node_port]::int[],
ARRAY['select a from generate_series(1,2) a',
'select a from generate_series(1,2) a']::text[],
false);
-- can create tables at worker
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
ARRAY[:node_port, :node_port]::int[],
ARRAY['create table first_table(a int, b int)',
'create table second_table(a int, b int)']::text[],
false);
-- can insert into table
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['insert into first_table select a,a from generate_series(1,20) a']::text[],
false);
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select count(*) from first_table']::text[],
false);
-- insert into second table twice
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['insert into second_table select * from first_table']::text[],
false);
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['insert into second_table select * from first_table']::text[],
false);
-- check inserted values at second table
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select count(*) from second_table']::text[],
false);
-- store worker node name and port again
-- previously set variables become unusable after some number of uses
SELECT quote_literal(node_name) as node_name, node_port as node_port
FROM master_get_active_worker_nodes()
ORDER BY node_port
LIMIT 1 \gset
-- create index on tables
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['create index first_table_index on first_table(a)']::text[],
false);
-- drop created tables
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['drop table first_table']::text[],
false);
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['drop table second_table']::text[],
false);
-- verify table is dropped
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select count(*) from second_table']::text[],
false);
--
-- Run the same tests in parallel
--
-- connect to the first worker and ask for shard count, should return 0
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select count(*) from pg_dist_shard']::text[],
true);
-- connect to the first worker and ask for shards, should fail with
-- expecting a single column error
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select * from pg_dist_shard']::text[],
true);
-- query result may only contain a single row
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select a from generate_series(1,2) a']::text[],
true);
-- send multiple queries
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
ARRAY[:node_port, :node_port]::int[],
ARRAY['select a from generate_series(1,1) a',
'select a from generate_series(2,2) a']::text[],
true);
-- send multiple queries, one fails
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
ARRAY[:node_port, :node_port]::int[],
ARRAY['select a from generate_series(1,1) a',
'select a from generate_series(1,2) a']::text[],
true);
-- send multiple queries, both fail
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
ARRAY[:node_port, :node_port]::int[],
ARRAY['select a from generate_series(1,2) a',
'select a from generate_series(1,2) a']::text[],
true);
-- can create tables at worker
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
ARRAY[:node_port, :node_port]::int[],
ARRAY['create table first_table(a int, b int)',
'create table second_table(a int, b int)']::text[],
true);
-- store worker node name and port again
-- previously set variables become unusable after some number of uses
SELECT quote_literal(node_name) as node_name, node_port as node_port
FROM master_get_active_worker_nodes()
ORDER BY node_port
LIMIT 1 \gset
-- can insert into table
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['insert into first_table select a,a from generate_series(1,20) a']::text[],
true);
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select count(*) from first_table']::text[],
true);
-- insert into second table twice
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['insert into second_table select * from first_table']::text[],
true);
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['insert into second_table select * from first_table']::text[],
true);
-- check inserted values at second table
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select count(*) from second_table']::text[],
true);
-- create index on tables
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['create index first_table_index on first_table(a)']::text[],
true);
-- drop created tables
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['drop table first_table']::text[],
true);
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['drop table second_table']::text[],
true);
-- verify table is dropped
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
ARRAY['select count(*) from second_table']::text[],
true);
-- drop the function after use
DROP FUNCTION master_run_on_worker(worker_name text[], port integer[], command text[],
parallel boolean, OUT node_name text, OUT node_port integer,
OUT success boolean, OUT result text);