mirror of https://github.com/citusdata/citus.git
commit
a2f6b29a6d
|
@ -0,0 +1,285 @@
|
|||
/*
|
||||
* citus_tools.sql
|
||||
* Contains definitions of citus_tools UDFs
|
||||
* - citus_run_on_all_workers
|
||||
* - citus_run_on_all_placements
|
||||
* - citus_run_on_all_colocated_placements
|
||||
* - citus_run_on_all_shards
|
||||
*
|
||||
* These functions depends on presence of UDF master_run_on_worker
|
||||
*/
|
||||
|
||||
CREATE OR REPLACE FUNCTION master_run_on_worker(worker_name text[], port integer[],
|
||||
command text[], parallel boolean,
|
||||
OUT node_name text, OUT node_port integer,
|
||||
OUT success boolean, OUT result text )
|
||||
RETURNS SETOF record
|
||||
LANGUAGE C STABLE STRICT
|
||||
AS 'citus.so', $$master_run_on_worker$$;
|
||||
|
||||
|
||||
CREATE TYPE colocation_placement_type AS (
|
||||
shardid1 bigint,
|
||||
shardid2 bigint,
|
||||
nodename text,
|
||||
nodeport bigint
|
||||
);
|
||||
|
||||
--
|
||||
-- tables_colocated returns true if given tables are co-located, false otherwise.
|
||||
-- The function checks shard definitions, matches shard placements for given tables.
|
||||
--
|
||||
CREATE OR REPLACE FUNCTION citus_tables_colocated(table1 regclass, table2 regclass)
|
||||
RETURNS bool
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
DECLARE
|
||||
colocated_shard_count int;
|
||||
table1_shard_count int;
|
||||
table2_shard_count int;
|
||||
table1_placement_count int;
|
||||
table2_placement_count int;
|
||||
table1_placements colocation_placement_type[];
|
||||
table2_placements colocation_placement_type[];
|
||||
BEGIN
|
||||
SELECT count(*),
|
||||
(SELECT count(*) FROM pg_dist_shard a WHERE a.logicalrelid = table1),
|
||||
(SELECT count(*) FROM pg_dist_shard b WHERE b.logicalrelid = table2)
|
||||
INTO colocated_shard_count, table1_shard_count, table2_shard_count
|
||||
FROM pg_dist_shard tba JOIN pg_dist_shard tbb USING(shardminvalue, shardmaxvalue)
|
||||
WHERE tba.logicalrelid = table1 AND tbb.logicalrelid = table2;
|
||||
|
||||
IF (table1_shard_count != table2_shard_count OR
|
||||
table1_shard_count != colocated_shard_count)
|
||||
THEN
|
||||
RETURN false;
|
||||
END IF;
|
||||
|
||||
WITH colocated_shards AS (
|
||||
SELECT tba.shardid as shardid1, tbb.shardid as shardid2
|
||||
FROM pg_dist_shard tba JOIN pg_dist_shard tbb USING(shardminvalue, shardmaxvalue)
|
||||
WHERE tba.logicalrelid = table1 AND tbb.logicalrelid = table2),
|
||||
left_shard_placements AS (
|
||||
SELECT cs.shardid1, cs.shardid2, sp.nodename, sp.nodeport
|
||||
FROM colocated_shards cs JOIN pg_dist_shard_placement sp ON (cs.shardid1 = sp.shardid)
|
||||
WHERE sp.shardstate = 1)
|
||||
SELECT
|
||||
array_agg((lsp.shardid1, lsp.shardid2, lsp.nodename, lsp.nodeport)::colocation_placement_type
|
||||
ORDER BY shardid1, shardid2, nodename, nodeport),
|
||||
count(distinct lsp.shardid1)
|
||||
FROM left_shard_placements lsp
|
||||
INTO table1_placements, table1_placement_count;
|
||||
|
||||
WITH colocated_shards AS (
|
||||
SELECT tba.shardid as shardid1, tbb.shardid as shardid2
|
||||
FROM pg_dist_shard tba JOIN pg_dist_shard tbb USING(shardminvalue, shardmaxvalue)
|
||||
WHERE tba.logicalrelid = table1 AND tbb.logicalrelid = table2),
|
||||
right_shard_placements AS (
|
||||
SELECT cs.shardid1, cs.shardid2, sp.nodename, sp.nodeport
|
||||
FROM colocated_shards cs LEFT JOIN pg_dist_shard_placement sp ON(cs.shardid2 = sp.shardid)
|
||||
WHERE sp.shardstate = 1)
|
||||
SELECT
|
||||
array_agg((rsp.shardid1, rsp.shardid2, rsp.nodename, rsp.nodeport)::colocation_placement_type
|
||||
ORDER BY shardid1, shardid2, nodename, nodeport),
|
||||
count(distinct rsp.shardid2)
|
||||
FROM right_shard_placements rsp
|
||||
INTO table2_placements, table2_placement_count;
|
||||
|
||||
IF (table1_shard_count != table1_placement_count
|
||||
OR table1_placement_count != table2_placement_count) THEN
|
||||
RETURN false;
|
||||
END IF;
|
||||
|
||||
IF (array_length(table1_placements, 1) != array_length(table2_placements, 1)) THEN
|
||||
RETURN false;
|
||||
END IF;
|
||||
|
||||
FOR i IN 1..array_length(table1_placements,1) LOOP
|
||||
IF (table1_placements[i].nodename != table2_placements[i].nodename OR
|
||||
table1_placements[i].nodeport != table2_placements[i].nodeport) THEN
|
||||
RETURN false;
|
||||
END IF;
|
||||
END LOOP;
|
||||
|
||||
RETURN true;
|
||||
END;
|
||||
$function$;
|
||||
|
||||
|
||||
CREATE OR REPLACE FUNCTION citus_run_on_all_workers(command text,
|
||||
parallel bool default true,
|
||||
OUT nodename text,
|
||||
OUT nodeport int,
|
||||
OUT success bool,
|
||||
OUT result text)
|
||||
RETURNS SETOF record
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
DECLARE
|
||||
workers text[];
|
||||
ports int[];
|
||||
commands text[];
|
||||
BEGIN
|
||||
WITH citus_workers AS (
|
||||
SELECT * FROM master_get_active_worker_nodes() ORDER BY node_name, node_port)
|
||||
SELECT array_agg(node_name), array_agg(node_port), array_agg(command)
|
||||
INTO workers, ports, commands
|
||||
FROM citus_workers;
|
||||
|
||||
RETURN QUERY SELECT * FROM master_run_on_worker(workers, ports, commands, parallel);
|
||||
END;
|
||||
$function$;
|
||||
|
||||
|
||||
CREATE OR REPLACE FUNCTION citus_run_on_all_placements(table_name regclass, command text,
|
||||
parallel bool default true,
|
||||
OUT nodename text,
|
||||
OUT nodeport int,
|
||||
OUT shardid bigint,
|
||||
OUT success bool, OUT result text)
|
||||
RETURNS SETOF record
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
DECLARE
|
||||
workers text[];
|
||||
ports int[];
|
||||
shards bigint[];
|
||||
commands text[];
|
||||
BEGIN
|
||||
WITH citus_placements AS (
|
||||
SELECT
|
||||
ds.logicalrelid::regclass AS tablename,
|
||||
ds.shardid AS shardid,
|
||||
shard_name(ds.logicalrelid, ds.shardid) AS shardname,
|
||||
dsp.nodename AS nodename, dsp.nodeport::int AS nodeport
|
||||
FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid)
|
||||
WHERE dsp.shardstate = 1 and ds.logicalrelid::regclass = table_name
|
||||
ORDER BY ds.logicalrelid, ds.shardid, dsp.nodename, dsp.nodeport)
|
||||
SELECT
|
||||
array_agg(cp.nodename), array_agg(cp.nodeport), array_agg(cp.shardid),
|
||||
array_agg(format(command, cp.shardname))
|
||||
INTO workers, ports, shards, commands
|
||||
FROM citus_placements cp;
|
||||
|
||||
RETURN QUERY
|
||||
SELECT r.node_name, r.node_port, shards[ordinality],
|
||||
r.success, r.result
|
||||
FROM master_run_on_worker(workers, ports, commands, parallel) WITH ORDINALITY r;
|
||||
END;
|
||||
$function$;
|
||||
|
||||
|
||||
CREATE OR REPLACE FUNCTION citus_run_on_all_colocated_placements(table_name1 regclass,
|
||||
table_name2 regclass,
|
||||
command text,
|
||||
parallel bool default true,
|
||||
OUT nodename text,
|
||||
OUT nodeport int,
|
||||
OUT shardid1 bigint,
|
||||
OUT shardid2 bigint,
|
||||
OUT success bool,
|
||||
OUT result text)
|
||||
RETURNS SETOF record
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
DECLARE
|
||||
workers text[];
|
||||
ports int[];
|
||||
shards1 bigint[];
|
||||
shards2 bigint[];
|
||||
commands text[];
|
||||
BEGIN
|
||||
IF NOT (SELECT citus_tables_colocated(table_name1, table_name2)) THEN
|
||||
RAISE EXCEPTION 'tables % and % are not co-located', table_name1, table_name2;
|
||||
END IF;
|
||||
|
||||
WITH active_shard_placements AS (
|
||||
SELECT
|
||||
ds.logicalrelid,
|
||||
ds.shardid AS shardid,
|
||||
shard_name(ds.logicalrelid, ds.shardid) AS shardname,
|
||||
ds.shardminvalue AS shardminvalue,
|
||||
ds.shardmaxvalue AS shardmaxvalue,
|
||||
dsp.nodename AS nodename,
|
||||
dsp.nodeport::int AS nodeport
|
||||
FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid)
|
||||
WHERE dsp.shardstate = 1 and (ds.logicalrelid::regclass = table_name1 or
|
||||
ds.logicalrelid::regclass = table_name2)
|
||||
ORDER BY ds.logicalrelid, ds.shardid, dsp.nodename, dsp.nodeport),
|
||||
citus_colocated_placements AS (
|
||||
SELECT
|
||||
a.logicalrelid::regclass AS tablename1,
|
||||
a.shardid AS shardid1,
|
||||
shard_name(a.logicalrelid, a.shardid) AS shardname1,
|
||||
b.logicalrelid::regclass AS tablename2,
|
||||
b.shardid AS shardid2,
|
||||
shard_name(b.logicalrelid, b.shardid) AS shardname2,
|
||||
a.nodename AS nodename,
|
||||
a.nodeport::int AS nodeport
|
||||
FROM
|
||||
active_shard_placements a, active_shard_placements b
|
||||
WHERE
|
||||
a.shardminvalue = b.shardminvalue AND
|
||||
a.shardmaxvalue = b.shardmaxvalue AND
|
||||
a.logicalrelid != b.logicalrelid AND
|
||||
a.nodename = b.nodename AND
|
||||
a.nodeport = b.nodeport AND
|
||||
a.logicalrelid::regclass = table_name1 AND
|
||||
b.logicalrelid::regclass = table_name2
|
||||
ORDER BY a.logicalrelid, a.shardid, nodename, nodeport)
|
||||
SELECT
|
||||
array_agg(cp.nodename), array_agg(cp.nodeport), array_agg(cp.shardid1),
|
||||
array_agg(cp.shardid2), array_agg(format(command, cp.shardname1, cp.shardname2))
|
||||
INTO workers, ports, shards1, shards2, commands
|
||||
FROM citus_colocated_placements cp;
|
||||
|
||||
RETURN QUERY SELECT r.node_name, r.node_port, shards1[ordinality],
|
||||
shards2[ordinality], r.success, r.result
|
||||
FROM master_run_on_worker(workers, ports, commands, parallel) WITH ORDINALITY r;
|
||||
END;
|
||||
$function$;
|
||||
|
||||
|
||||
CREATE OR REPLACE FUNCTION citus_run_on_all_shards(table_name regclass, command text,
|
||||
parallel bool default true,
|
||||
OUT shardid bigint,
|
||||
OUT success bool,
|
||||
OUT result text)
|
||||
RETURNS SETOF record
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
DECLARE
|
||||
workers text[];
|
||||
ports int[];
|
||||
shards bigint[];
|
||||
commands text[];
|
||||
shard_count int;
|
||||
BEGIN
|
||||
SELECT COUNT(*) INTO shard_count FROM pg_dist_shard
|
||||
WHERE logicalrelid = table_name;
|
||||
|
||||
WITH citus_shards AS (
|
||||
SELECT ds.logicalrelid::regclass AS tablename,
|
||||
ds.shardid AS shardid,
|
||||
shard_name(ds.logicalrelid, ds.shardid) AS shardname,
|
||||
array_agg(dsp.nodename) AS nodenames,
|
||||
array_agg(dsp.nodeport) AS nodeports
|
||||
FROM pg_dist_shard ds LEFT JOIN pg_dist_shard_placement dsp USING (shardid)
|
||||
WHERE dsp.shardstate = 1 and ds.logicalrelid::regclass = table_name
|
||||
GROUP BY ds.logicalrelid, ds.shardid
|
||||
ORDER BY ds.logicalrelid, ds.shardid)
|
||||
SELECT
|
||||
array_agg(cs.nodenames[1]), array_agg(cs.nodeports[1]), array_agg(cs.shardid),
|
||||
array_agg(format(command, cs.shardname))
|
||||
INTO workers, ports, shards, commands
|
||||
FROM citus_shards cs;
|
||||
|
||||
IF (shard_count != array_length(workers, 1)) THEN
|
||||
RAISE NOTICE 'some shards do not have active placements';
|
||||
END IF;
|
||||
|
||||
RETURN QUERY
|
||||
SELECT shards[ordinality], r.success, r.result
|
||||
FROM master_run_on_worker(workers, ports, commands, parallel) WITH ORDINALITY r;
|
||||
END;
|
||||
$function$;
|
|
@ -0,0 +1,566 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* master_citus_tools.c
|
||||
* UDF to run multi shard/worker queries
|
||||
*
|
||||
* This file contains functions to run commands on other worker/shards.
|
||||
*
|
||||
* Copyright (c) 2016-2016, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "funcapi.h"
|
||||
#include "libpq-fe.h"
|
||||
#include "miscadmin.h"
|
||||
|
||||
|
||||
#include "access/htup_details.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "distributed/connection_cache.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "utils/builtins.h"
|
||||
|
||||
#include "distributed/multi_client_executor.h"
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(master_run_on_worker);
|
||||
|
||||
static int ParseCommandParameters(FunctionCallInfo fcinfo, StringInfo **nodeNameArray,
|
||||
int **nodePortsArray, StringInfo **commandStringArray,
|
||||
bool *parallel);
|
||||
static void ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray,
|
||||
int *nodePortArray,
|
||||
StringInfo *commandStringArray,
|
||||
bool *statusArray,
|
||||
StringInfo *resultStringArray,
|
||||
int commmandCount);
|
||||
static bool GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus,
|
||||
StringInfo queryResultString);
|
||||
static bool EvaluateQueryResult(PGconn *connection, PGresult *queryResult, StringInfo
|
||||
queryResultString);
|
||||
static void StoreErrorMessage(PGconn *connection, StringInfo queryResultString);
|
||||
static void ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray,
|
||||
int *nodePortArray,
|
||||
StringInfo *commandStringArray,
|
||||
bool *statusArray,
|
||||
StringInfo *resultStringArray,
|
||||
int commmandCount);
|
||||
static bool ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort,
|
||||
char *queryString, StringInfo queryResult);
|
||||
static Tuplestorestate * CreateTupleStore(TupleDesc tupleDescriptor,
|
||||
StringInfo *nodeNameArray, int *nodePortArray,
|
||||
bool *statusArray,
|
||||
StringInfo *resultArray, int commandCount);
|
||||
|
||||
|
||||
/*
|
||||
* master_run_on_worker executes queries/commands to run on specified worker and
|
||||
* returns success status and query/command result. Expected input is 3 arrays
|
||||
* containing node names, node ports, and query strings, and boolean flag to specify
|
||||
* parallel execution. The function then returns node_name, node_port, success,
|
||||
* result tuples upon completion of the query. The same user credentials are used
|
||||
* to connect to remote nodes.
|
||||
*/
|
||||
Datum
|
||||
master_run_on_worker(PG_FUNCTION_ARGS)
|
||||
{
|
||||
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
||||
MemoryContext per_query_ctx = NULL;
|
||||
MemoryContext oldcontext = NULL;
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
Tuplestorestate *tupleStore = NULL;
|
||||
bool parallelExecution = false;
|
||||
StringInfo *nodeNameArray = NULL;
|
||||
int *nodePortArray = NULL;
|
||||
StringInfo *commandStringArray = NULL;
|
||||
bool *statusArray = NULL;
|
||||
StringInfo *resultArray = NULL;
|
||||
int commandIndex = 0;
|
||||
int commandCount = 0;
|
||||
|
||||
/* check to see if caller supports us returning a tuplestore */
|
||||
if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize))
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("materialize mode required, but it is not "
|
||||
"allowed in this context")));
|
||||
}
|
||||
|
||||
commandCount = ParseCommandParameters(fcinfo, &nodeNameArray, &nodePortArray,
|
||||
&commandStringArray, ¶llelExecution);
|
||||
|
||||
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
|
||||
oldcontext = MemoryContextSwitchTo(per_query_ctx);
|
||||
|
||||
/* get the requested return tuple description */
|
||||
tupleDescriptor = CreateTupleDescCopy(rsinfo->expectedDesc);
|
||||
|
||||
/*
|
||||
* Check to make sure we have correct tuple descriptor
|
||||
*/
|
||||
if (tupleDescriptor->natts != 4 ||
|
||||
tupleDescriptor->attrs[0]->atttypid != TEXTOID ||
|
||||
tupleDescriptor->attrs[1]->atttypid != INT4OID ||
|
||||
tupleDescriptor->attrs[2]->atttypid != BOOLOID ||
|
||||
tupleDescriptor->attrs[3]->atttypid != TEXTOID)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_COLUMN_DEFINITION),
|
||||
errmsg("query-specified return tuple and "
|
||||
"function return type are not compatible")));
|
||||
}
|
||||
|
||||
/* prepare storage for status and result values */
|
||||
statusArray = palloc0(commandCount * sizeof(bool));
|
||||
resultArray = palloc0(commandCount * sizeof(StringInfo));
|
||||
for (commandIndex = 0; commandIndex < commandCount; commandIndex++)
|
||||
{
|
||||
resultArray[commandIndex] = makeStringInfo();
|
||||
}
|
||||
|
||||
if (parallelExecution)
|
||||
{
|
||||
ExecuteCommandsInParallelAndStoreResults(nodeNameArray, nodePortArray,
|
||||
commandStringArray,
|
||||
statusArray, resultArray, commandCount);
|
||||
}
|
||||
else
|
||||
{
|
||||
ExecuteCommandsAndStoreResults(nodeNameArray, nodePortArray, commandStringArray,
|
||||
statusArray, resultArray, commandCount);
|
||||
}
|
||||
|
||||
/* let the caller know we're sending back a tuplestore */
|
||||
rsinfo->returnMode = SFRM_Materialize;
|
||||
tupleStore = CreateTupleStore(tupleDescriptor,
|
||||
nodeNameArray, nodePortArray, statusArray,
|
||||
resultArray, commandCount);
|
||||
rsinfo->setResult = tupleStore;
|
||||
rsinfo->setDesc = tupleDescriptor;
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/* ParseCommandParameters reads call parameters and fills in data structures */
|
||||
static int
|
||||
ParseCommandParameters(FunctionCallInfo fcinfo, StringInfo **nodeNameArray,
|
||||
int **nodePortsArray, StringInfo **commandStringArray,
|
||||
bool *parallel)
|
||||
{
|
||||
ArrayType *nodeNameArrayObject = PG_GETARG_ARRAYTYPE_P(0);
|
||||
ArrayType *nodePortArrayObject = PG_GETARG_ARRAYTYPE_P(1);
|
||||
ArrayType *commandStringArrayObject = PG_GETARG_ARRAYTYPE_P(2);
|
||||
bool parallelExecution = PG_GETARG_BOOL(3);
|
||||
int nodeNameCount = ArrayObjectCount(nodeNameArrayObject);
|
||||
int nodePortCount = ArrayObjectCount(nodePortArrayObject);
|
||||
int commandStringCount = ArrayObjectCount(commandStringArrayObject);
|
||||
Datum *nodeNameDatumArray = DeconstructArrayObject(nodeNameArrayObject);
|
||||
Datum *nodePortDatumArray = DeconstructArrayObject(nodePortArrayObject);
|
||||
Datum *commandStringDatumArray = DeconstructArrayObject(commandStringArrayObject);
|
||||
int index = 0;
|
||||
StringInfo *nodeNames = NULL;
|
||||
int *nodePorts = NULL;
|
||||
StringInfo *commandStrings = NULL;
|
||||
|
||||
if (nodeNameCount != nodePortCount || nodeNameCount != commandStringCount)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("expected same number of node name, port, and query string")));
|
||||
}
|
||||
|
||||
nodeNames = palloc0(nodeNameCount * sizeof(StringInfo));
|
||||
nodePorts = palloc0(nodeNameCount * sizeof(int));
|
||||
commandStrings = palloc0(nodeNameCount * sizeof(StringInfo));
|
||||
|
||||
for (index = 0; index < nodeNameCount; index++)
|
||||
{
|
||||
text *nodeNameText = DatumGetTextP(nodeNameDatumArray[index]);
|
||||
char *nodeName = text_to_cstring(nodeNameText);
|
||||
int32 nodePort = DatumGetInt32(nodePortDatumArray[index]);
|
||||
text *commandText = DatumGetTextP(commandStringDatumArray[index]);
|
||||
char *commandString = text_to_cstring(commandText);
|
||||
|
||||
nodeNames[index] = makeStringInfo();
|
||||
commandStrings[index] = makeStringInfo();
|
||||
|
||||
appendStringInfo(nodeNames[index], "%s", nodeName);
|
||||
nodePorts[index] = nodePort;
|
||||
appendStringInfo(commandStrings[index], "%s", commandString);
|
||||
}
|
||||
|
||||
*nodeNameArray = nodeNames;
|
||||
*nodePortsArray = nodePorts;
|
||||
*commandStringArray = commandStrings;
|
||||
*parallel = parallelExecution;
|
||||
|
||||
return nodeNameCount;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExecuteCommandsInParellelAndStoreResults connects to each node specified in
|
||||
* nodeNameArray and nodePortArray, and executes command in commandStringArray
|
||||
* in parallel fashion. Execution success status and result is reported for
|
||||
* each command in statusArray and resultStringArray. Each array contains
|
||||
* commandCount items.
|
||||
*/
|
||||
static void
|
||||
ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePortArray,
|
||||
StringInfo *commandStringArray,
|
||||
bool *statusArray, StringInfo *resultStringArray,
|
||||
int commmandCount)
|
||||
{
|
||||
int commandIndex = 0;
|
||||
char *nodeUser = CurrentUserName();
|
||||
PGconn **connectionArray = palloc0(commmandCount * sizeof(PGconn *));
|
||||
int finishedCount = 0;
|
||||
|
||||
/* establish connections */
|
||||
for (commandIndex = 0; commandIndex < commmandCount; commandIndex++)
|
||||
{
|
||||
char *nodeName = nodeNameArray[commandIndex]->data;
|
||||
int nodePort = nodePortArray[commandIndex];
|
||||
PGconn *connection = ConnectToNode(nodeName, nodePort, nodeUser);
|
||||
StringInfo queryResultString = resultStringArray[commandIndex];
|
||||
|
||||
statusArray[commandIndex] = true;
|
||||
|
||||
connectionArray[commandIndex] = connection;
|
||||
|
||||
if (connection == NULL)
|
||||
{
|
||||
appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName,
|
||||
(int) nodePort);
|
||||
statusArray[commandIndex] = false;
|
||||
finishedCount++;
|
||||
}
|
||||
}
|
||||
|
||||
/* send queries at once */
|
||||
for (commandIndex = 0; commandIndex < commmandCount; commandIndex++)
|
||||
{
|
||||
int querySent = 0;
|
||||
PGconn *connection = connectionArray[commandIndex];
|
||||
char *queryString = commandStringArray[commandIndex]->data;
|
||||
StringInfo queryResultString = resultStringArray[commandIndex];
|
||||
|
||||
/*
|
||||
* If we don't have a connection, nothing to send, error string should already
|
||||
* been filled.
|
||||
*/
|
||||
if (connection == NULL)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
querySent = PQsendQuery(connection, queryString);
|
||||
|
||||
if (querySent == 0)
|
||||
{
|
||||
StoreErrorMessage(connection, queryResultString);
|
||||
statusArray[commandIndex] = false;
|
||||
PQfinish(connection);
|
||||
connectionArray[commandIndex] = NULL;
|
||||
finishedCount++;
|
||||
}
|
||||
}
|
||||
|
||||
/* check for query results */
|
||||
while (finishedCount < commmandCount)
|
||||
{
|
||||
for (commandIndex = 0; commandIndex < commmandCount; commandIndex++)
|
||||
{
|
||||
PGconn *connection = connectionArray[commandIndex];
|
||||
StringInfo queryResultString = resultStringArray[commandIndex];
|
||||
bool success = false;
|
||||
bool queryFinished = false;
|
||||
|
||||
if (connection == NULL)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
queryFinished = GetConnectionStatusAndResult(connection, &success,
|
||||
queryResultString);
|
||||
|
||||
if (queryFinished)
|
||||
{
|
||||
finishedCount++;
|
||||
statusArray[commandIndex] = success;
|
||||
connectionArray[commandIndex] = NULL;
|
||||
PQfinish(connection);
|
||||
}
|
||||
}
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
if (finishedCount < commmandCount)
|
||||
{
|
||||
long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L;
|
||||
pg_usleep(sleepIntervalPerCycle);
|
||||
}
|
||||
}
|
||||
|
||||
pfree(connectionArray);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetConnectionStatusAndResult checks the active connection and returns true if
|
||||
* query execution is finished (either success or fail).
|
||||
* Query success/fail in resultStatus, and query result in queryResultString are
|
||||
* reported upon completion of the query.
|
||||
*/
|
||||
static bool
|
||||
GetConnectionStatusAndResult(PGconn *connection, bool *resultStatus,
|
||||
StringInfo queryResultString)
|
||||
{
|
||||
bool finished = true;
|
||||
ConnStatusType connectionStatus = PQstatus(connection);
|
||||
int consumeInput = 0;
|
||||
PGresult *queryResult = NULL;
|
||||
bool success = false;
|
||||
|
||||
*resultStatus = false;
|
||||
resetStringInfo(queryResultString);
|
||||
|
||||
if (connectionStatus == CONNECTION_BAD)
|
||||
{
|
||||
appendStringInfo(queryResultString, "connection lost");
|
||||
return finished;
|
||||
}
|
||||
|
||||
consumeInput = PQconsumeInput(connection);
|
||||
if (consumeInput == 0)
|
||||
{
|
||||
appendStringInfo(queryResultString, "query result unavailable");
|
||||
return finished;
|
||||
}
|
||||
|
||||
/* check later if busy */
|
||||
if (PQisBusy(connection) != 0)
|
||||
{
|
||||
finished = false;
|
||||
return finished;
|
||||
}
|
||||
|
||||
/* query result is available at this point */
|
||||
queryResult = PQgetResult(connection);
|
||||
success = EvaluateQueryResult(connection, queryResult, queryResultString);
|
||||
PQclear(queryResult);
|
||||
|
||||
*resultStatus = success;
|
||||
finished = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EvaluateQueryResult gets the query result from connection and returns
|
||||
* true if the query is executed successfully, false otherwise. A query result
|
||||
* or an error message is returned in queryResultString. The function requires
|
||||
* that the query returns a single column/single row result. It returns an
|
||||
* error otherwise.
|
||||
*/
|
||||
static bool
|
||||
EvaluateQueryResult(PGconn *connection, PGresult *queryResult,
|
||||
StringInfo queryResultString)
|
||||
{
|
||||
bool success = false;
|
||||
|
||||
ExecStatusType resultStatus = PQresultStatus(queryResult);
|
||||
if (resultStatus == PGRES_COMMAND_OK)
|
||||
{
|
||||
char *commandStatus = PQcmdStatus(queryResult);
|
||||
appendStringInfo(queryResultString, "%s", commandStatus);
|
||||
success = true;
|
||||
}
|
||||
else if (resultStatus == PGRES_TUPLES_OK)
|
||||
{
|
||||
int ntuples = PQntuples(queryResult);
|
||||
int nfields = PQnfields(queryResult);
|
||||
|
||||
/* error if query returns more than 1 rows, or more than 1 fields */
|
||||
if (nfields != 1)
|
||||
{
|
||||
appendStringInfo(queryResultString,
|
||||
"expected a single column in query target");
|
||||
}
|
||||
else if (ntuples > 1)
|
||||
{
|
||||
appendStringInfo(queryResultString,
|
||||
"expected a single row in query result");
|
||||
}
|
||||
else
|
||||
{
|
||||
int row = 0;
|
||||
int column = 0;
|
||||
if (!PQgetisnull(queryResult, row, column))
|
||||
{
|
||||
char *queryResultValue = PQgetvalue(queryResult, row, column);
|
||||
appendStringInfo(queryResultString, "%s", queryResultValue);
|
||||
}
|
||||
success = true;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
StoreErrorMessage(connection, queryResultString);
|
||||
}
|
||||
|
||||
return success;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StoreErrorMessage gets the error message from connection and stores it
|
||||
* in queryResultString. It should be called only when error is present
|
||||
* otherwise it would return a default error message.
|
||||
*/
|
||||
static void
|
||||
StoreErrorMessage(PGconn *connection, StringInfo queryResultString)
|
||||
{
|
||||
char *errorMessage = PQerrorMessage(connection);
|
||||
char *firstNewlineIndex = strchr(errorMessage, '\n');
|
||||
|
||||
/* trim the error message at the line break */
|
||||
if (firstNewlineIndex != NULL)
|
||||
{
|
||||
*firstNewlineIndex = '\0';
|
||||
}
|
||||
|
||||
/* put a default error message if no error message is reported */
|
||||
if (errorMessage == NULL)
|
||||
{
|
||||
errorMessage = "An error occurred while running the query";
|
||||
}
|
||||
|
||||
appendStringInfo(queryResultString, "%s", errorMessage);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExecuteCommandsAndStoreResults connects to each node specified in
|
||||
* nodeNameArray and nodePortArray, and executes command in commandStringArray
|
||||
* in sequential order. Execution success status and result is reported for
|
||||
* each command in statusArray and resultStringArray. Each array contains
|
||||
* commandCount items.
|
||||
*/
|
||||
static void
|
||||
ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray, int *nodePortArray,
|
||||
StringInfo *commandStringArray, bool *statusArray,
|
||||
StringInfo *resultStringArray, int commmandCount)
|
||||
{
|
||||
int commandIndex = 0;
|
||||
for (commandIndex = 0; commandIndex < commmandCount; commandIndex++)
|
||||
{
|
||||
char *nodeName = nodeNameArray[commandIndex]->data;
|
||||
int32 nodePort = nodePortArray[commandIndex];
|
||||
bool success = false;
|
||||
char *queryString = commandStringArray[commandIndex]->data;
|
||||
StringInfo queryResultString = resultStringArray[commandIndex];
|
||||
|
||||
success = ExecuteRemoteQueryOrCommand(nodeName, nodePort, queryString,
|
||||
queryResultString);
|
||||
|
||||
statusArray[commandIndex] = success;
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExecuteRemoteQueryOrCommand executes a query at specified remote node using
|
||||
* the calling user's credentials. The function returns the query status
|
||||
* (success/failure), and query result. The query is expected to return a single
|
||||
* target containing zero or one rows.
|
||||
*/
|
||||
static bool
|
||||
ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString,
|
||||
StringInfo queryResultString)
|
||||
{
|
||||
char *nodeUser = CurrentUserName();
|
||||
PGconn *nodeConnection = ConnectToNode(nodeName, nodePort, nodeUser);
|
||||
bool success = false;
|
||||
|
||||
if (nodeConnection == NULL)
|
||||
{
|
||||
appendStringInfo(queryResultString, "failed to connect to %s:%d", nodeName,
|
||||
(int) nodePort);
|
||||
return false;
|
||||
}
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
PGresult *queryResult = PQexec(nodeConnection, queryString);
|
||||
success = EvaluateQueryResult(nodeConnection, queryResult, queryResultString);
|
||||
|
||||
PQclear(queryResult);
|
||||
|
||||
/* close the connection */
|
||||
PQfinish(nodeConnection);
|
||||
nodeConnection = NULL;
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
StoreErrorMessage(nodeConnection, queryResultString);
|
||||
|
||||
/* close the connection */
|
||||
PQfinish(nodeConnection);
|
||||
nodeConnection = NULL;
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
return success;
|
||||
}
|
||||
|
||||
|
||||
/* CreateTupleStore prepares result tuples from individual query results */
|
||||
static Tuplestorestate *
|
||||
CreateTupleStore(TupleDesc tupleDescriptor,
|
||||
StringInfo *nodeNameArray, int *nodePortArray, bool *statusArray,
|
||||
StringInfo *resultArray, int commandCount)
|
||||
{
|
||||
Tuplestorestate *tupleStore = tuplestore_begin_heap(true, false, work_mem);
|
||||
int commandIndex = 0;
|
||||
bool nulls[4] = { false, false, false, false };
|
||||
|
||||
for (commandIndex = 0; commandIndex < commandCount; commandIndex++)
|
||||
{
|
||||
Datum values[4];
|
||||
HeapTuple tuple = NULL;
|
||||
StringInfo nodeNameString = nodeNameArray[commandIndex];
|
||||
StringInfo resultString = resultArray[commandIndex];
|
||||
text *nodeNameText = cstring_to_text_with_len(nodeNameString->data,
|
||||
nodeNameString->len);
|
||||
text *resultText = cstring_to_text_with_len(resultString->data,
|
||||
resultString->len);
|
||||
|
||||
values[0] = PointerGetDatum(nodeNameText);
|
||||
values[1] = Int32GetDatum(nodePortArray[commandIndex]);
|
||||
values[2] = BoolGetDatum(statusArray[commandIndex]);
|
||||
values[3] = PointerGetDatum(resultText);
|
||||
|
||||
tuple = heap_form_tuple(tupleDescriptor, values, nulls);
|
||||
tuplestore_puttuple(tupleStore, tuple);
|
||||
|
||||
heap_freetuple(tuple);
|
||||
pfree(nodeNameText);
|
||||
pfree(resultText);
|
||||
}
|
||||
|
||||
tuplestore_donestoring(tupleStore);
|
||||
|
||||
return tupleStore;
|
||||
}
|
|
@ -0,0 +1,370 @@
|
|||
--
|
||||
-- MULTI CITUS TOOLS
|
||||
--
|
||||
-- tests UDFs created for citus tools
|
||||
--
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1240000;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1240000;
|
||||
-- the function is not exposed explicitly, create the entry point
|
||||
CREATE OR REPLACE FUNCTION master_run_on_worker(worker_name text[], port integer[],
|
||||
command text[],
|
||||
parallel boolean default false,
|
||||
OUT node_name text, OUT node_port integer,
|
||||
OUT success boolean, OUT result text)
|
||||
RETURNS SETOF record
|
||||
LANGUAGE C STABLE STRICT
|
||||
AS 'citus.so', $$master_run_on_worker$$;
|
||||
-- test with invalid port, prevent OS dependent warning from being displayed
|
||||
SET client_min_messages to ERROR;
|
||||
SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[],
|
||||
ARRAY['select count(*) from pg_dist_shard']::text[],
|
||||
false);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+------------------------------------
|
||||
localhost | 666 | f | failed to connect to localhost:666
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[],
|
||||
ARRAY['select count(*) from pg_dist_shard']::text[],
|
||||
true);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+------------------------------------
|
||||
localhost | 666 | f | failed to connect to localhost:666
|
||||
(1 row)
|
||||
|
||||
RESET client_min_messages;
|
||||
-- store worker node name and port
|
||||
SELECT quote_literal(node_name) as node_name, node_port as node_port
|
||||
FROM master_get_active_worker_nodes()
|
||||
ORDER BY node_port
|
||||
LIMIT 1 \gset
|
||||
-- connect to the first worker and ask for shard count, should return 0
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select count(*) from pg_dist_shard']::text[],
|
||||
false);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+--------
|
||||
localhost | 57637 | t | 0
|
||||
(1 row)
|
||||
|
||||
-- connect to the first worker and ask for shards, should fail with
|
||||
-- expecting a single column error
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select * from pg_dist_shard']::text[],
|
||||
false);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+------------------------------------------
|
||||
localhost | 57637 | f | expected a single column in query target
|
||||
(1 row)
|
||||
|
||||
-- query result may only contain a single row
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select a from generate_series(1,2) a']::text[],
|
||||
false);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+---------------------------------------
|
||||
localhost | 57637 | f | expected a single row in query result
|
||||
(1 row)
|
||||
|
||||
|
||||
-- send multiple queries
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
|
||||
ARRAY[:node_port, :node_port]::int[],
|
||||
ARRAY['select a from generate_series(1,1) a',
|
||||
'select a from generate_series(2,2) a']::text[],
|
||||
false);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+--------
|
||||
localhost | 57637 | t | 1
|
||||
localhost | 57637 | t | 2
|
||||
(2 rows)
|
||||
|
||||
-- send multiple queries, one fails
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
|
||||
ARRAY[:node_port, :node_port]::int[],
|
||||
ARRAY['select a from generate_series(1,1) a',
|
||||
'select a from generate_series(1,2) a']::text[],
|
||||
false);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+---------------------------------------
|
||||
localhost | 57637 | t | 1
|
||||
localhost | 57637 | f | expected a single row in query result
|
||||
(2 rows)
|
||||
|
||||
-- send multiple queries, both fail
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
|
||||
ARRAY[:node_port, :node_port]::int[],
|
||||
ARRAY['select a from generate_series(1,2) a',
|
||||
'select a from generate_series(1,2) a']::text[],
|
||||
false);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+---------------------------------------
|
||||
localhost | 57637 | f | expected a single row in query result
|
||||
localhost | 57637 | f | expected a single row in query result
|
||||
(2 rows)
|
||||
|
||||
-- can create tables at worker
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
|
||||
ARRAY[:node_port, :node_port]::int[],
|
||||
ARRAY['create table first_table(a int, b int)',
|
||||
'create table second_table(a int, b int)']::text[],
|
||||
false);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+--------------
|
||||
localhost | 57637 | t | CREATE TABLE
|
||||
localhost | 57637 | t | CREATE TABLE
|
||||
(2 rows)
|
||||
|
||||
-- can insert into table
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['insert into first_table select a,a from generate_series(1,20) a']::text[],
|
||||
false);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+-------------
|
||||
localhost | 57637 | t | INSERT 0 20
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select count(*) from first_table']::text[],
|
||||
false);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+--------
|
||||
localhost | 57637 | t | 20
|
||||
(1 row)
|
||||
|
||||
-- insert into second table twice
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['insert into second_table select * from first_table']::text[],
|
||||
false);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+-------------
|
||||
localhost | 57637 | t | INSERT 0 20
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['insert into second_table select * from first_table']::text[],
|
||||
false);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+-------------
|
||||
localhost | 57637 | t | INSERT 0 20
|
||||
(1 row)
|
||||
|
||||
-- check inserted values at second table
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select count(*) from second_table']::text[],
|
||||
false);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+--------
|
||||
localhost | 57637 | t | 40
|
||||
(1 row)
|
||||
|
||||
-- store worker node name and port again
|
||||
-- previously set variables become unusable after some number of uses
|
||||
SELECT quote_literal(node_name) as node_name, node_port as node_port
|
||||
FROM master_get_active_worker_nodes()
|
||||
ORDER BY node_port
|
||||
LIMIT 1 \gset
|
||||
-- create index on tables
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['create index first_table_index on first_table(a)']::text[],
|
||||
false);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+--------------
|
||||
localhost | 57637 | t | CREATE INDEX
|
||||
(1 row)
|
||||
|
||||
-- drop created tables
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['drop table first_table']::text[],
|
||||
false);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+------------
|
||||
localhost | 57637 | t | DROP TABLE
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['drop table second_table']::text[],
|
||||
false);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+------------
|
||||
localhost | 57637 | t | DROP TABLE
|
||||
(1 row)
|
||||
|
||||
|
||||
-- verify table is dropped
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select count(*) from second_table']::text[],
|
||||
false);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+------------------------------------------------
|
||||
localhost | 57637 | f | ERROR: relation "second_table" does not exist
|
||||
(1 row)
|
||||
|
||||
--
|
||||
-- Run the same tests in parallel
|
||||
--
|
||||
-- connect to the first worker and ask for shard count, should return 0
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select count(*) from pg_dist_shard']::text[],
|
||||
true);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+--------
|
||||
localhost | 57637 | t | 0
|
||||
(1 row)
|
||||
|
||||
-- connect to the first worker and ask for shards, should fail with
|
||||
-- expecting a single column error
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select * from pg_dist_shard']::text[],
|
||||
true);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+------------------------------------------
|
||||
localhost | 57637 | f | expected a single column in query target
|
||||
(1 row)
|
||||
|
||||
-- query result may only contain a single row
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select a from generate_series(1,2) a']::text[],
|
||||
true);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+---------------------------------------
|
||||
localhost | 57637 | f | expected a single row in query result
|
||||
(1 row)
|
||||
|
||||
|
||||
-- send multiple queries
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
|
||||
ARRAY[:node_port, :node_port]::int[],
|
||||
ARRAY['select a from generate_series(1,1) a',
|
||||
'select a from generate_series(2,2) a']::text[],
|
||||
true);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+--------
|
||||
localhost | 57637 | t | 1
|
||||
localhost | 57637 | t | 2
|
||||
(2 rows)
|
||||
|
||||
-- send multiple queries, one fails
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
|
||||
ARRAY[:node_port, :node_port]::int[],
|
||||
ARRAY['select a from generate_series(1,1) a',
|
||||
'select a from generate_series(1,2) a']::text[],
|
||||
true);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+---------------------------------------
|
||||
localhost | 57637 | t | 1
|
||||
localhost | 57637 | f | expected a single row in query result
|
||||
(2 rows)
|
||||
|
||||
-- send multiple queries, both fail
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
|
||||
ARRAY[:node_port, :node_port]::int[],
|
||||
ARRAY['select a from generate_series(1,2) a',
|
||||
'select a from generate_series(1,2) a']::text[],
|
||||
true);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+---------------------------------------
|
||||
localhost | 57637 | f | expected a single row in query result
|
||||
localhost | 57637 | f | expected a single row in query result
|
||||
(2 rows)
|
||||
|
||||
-- can create tables at worker
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
|
||||
ARRAY[:node_port, :node_port]::int[],
|
||||
ARRAY['create table first_table(a int, b int)',
|
||||
'create table second_table(a int, b int)']::text[],
|
||||
true);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+--------------
|
||||
localhost | 57637 | t | CREATE TABLE
|
||||
localhost | 57637 | t | CREATE TABLE
|
||||
(2 rows)
|
||||
|
||||
-- store worker node name and port again
|
||||
-- previously set variables become unusable after some number of uses
|
||||
SELECT quote_literal(node_name) as node_name, node_port as node_port
|
||||
FROM master_get_active_worker_nodes()
|
||||
ORDER BY node_port
|
||||
LIMIT 1 \gset
|
||||
-- can insert into table
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['insert into first_table select a,a from generate_series(1,20) a']::text[],
|
||||
true);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+-------------
|
||||
localhost | 57637 | t | INSERT 0 20
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select count(*) from first_table']::text[],
|
||||
true);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+--------
|
||||
localhost | 57637 | t | 20
|
||||
(1 row)
|
||||
|
||||
-- insert into second table twice
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['insert into second_table select * from first_table']::text[],
|
||||
true);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+-------------
|
||||
localhost | 57637 | t | INSERT 0 20
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['insert into second_table select * from first_table']::text[],
|
||||
true);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+-------------
|
||||
localhost | 57637 | t | INSERT 0 20
|
||||
(1 row)
|
||||
|
||||
-- check inserted values at second table
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select count(*) from second_table']::text[],
|
||||
true);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+--------
|
||||
localhost | 57637 | t | 40
|
||||
(1 row)
|
||||
|
||||
-- create index on tables
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['create index first_table_index on first_table(a)']::text[],
|
||||
true);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+--------------
|
||||
localhost | 57637 | t | CREATE INDEX
|
||||
(1 row)
|
||||
|
||||
-- drop created tables
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['drop table first_table']::text[],
|
||||
true);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+------------
|
||||
localhost | 57637 | t | DROP TABLE
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['drop table second_table']::text[],
|
||||
true);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+------------
|
||||
localhost | 57637 | t | DROP TABLE
|
||||
(1 row)
|
||||
|
||||
-- verify table is dropped
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select count(*) from second_table']::text[],
|
||||
true);
|
||||
node_name | node_port | success | result
|
||||
-----------+-----------+---------+------------------------------------------------
|
||||
localhost | 57637 | f | ERROR: relation "second_table" does not exist
|
||||
(1 row)
|
||||
|
||||
-- drop the function after use
|
||||
DROP FUNCTION master_run_on_worker(worker_name text[], port integer[], command text[],
|
||||
parallel boolean, OUT node_name text, OUT node_port integer,
|
||||
OUT success boolean, OUT result text);
|
|
@ -183,3 +183,8 @@ test: multi_expire_table_cache
|
|||
# ----------
|
||||
test: multi_colocation_utils
|
||||
test: multi_colocated_shard_transfer
|
||||
|
||||
# ----------
|
||||
# multi_citus_tools tests utility functions written for citus tools
|
||||
# ----------
|
||||
test: multi_citus_tools
|
||||
|
|
|
@ -0,0 +1,216 @@
|
|||
--
|
||||
-- MULTI CITUS TOOLS
|
||||
--
|
||||
-- tests UDFs created for citus tools
|
||||
--
|
||||
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1240000;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1240000;
|
||||
|
||||
-- the function is not exposed explicitly, create the entry point
|
||||
CREATE OR REPLACE FUNCTION master_run_on_worker(worker_name text[], port integer[],
|
||||
command text[],
|
||||
parallel boolean default false,
|
||||
OUT node_name text, OUT node_port integer,
|
||||
OUT success boolean, OUT result text)
|
||||
RETURNS SETOF record
|
||||
LANGUAGE C STABLE STRICT
|
||||
AS 'citus.so', $$master_run_on_worker$$;
|
||||
|
||||
-- test with invalid port, prevent OS dependent warning from being displayed
|
||||
SET client_min_messages to ERROR;
|
||||
SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[],
|
||||
ARRAY['select count(*) from pg_dist_shard']::text[],
|
||||
false);
|
||||
|
||||
SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[],
|
||||
ARRAY['select count(*) from pg_dist_shard']::text[],
|
||||
true);
|
||||
RESET client_min_messages;
|
||||
|
||||
-- store worker node name and port
|
||||
SELECT quote_literal(node_name) as node_name, node_port as node_port
|
||||
FROM master_get_active_worker_nodes()
|
||||
ORDER BY node_port
|
||||
LIMIT 1 \gset
|
||||
|
||||
-- connect to the first worker and ask for shard count, should return 0
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select count(*) from pg_dist_shard']::text[],
|
||||
false);
|
||||
|
||||
-- connect to the first worker and ask for shards, should fail with
|
||||
-- expecting a single column error
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select * from pg_dist_shard']::text[],
|
||||
false);
|
||||
|
||||
-- query result may only contain a single row
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select a from generate_series(1,2) a']::text[],
|
||||
false);
|
||||
|
||||
-- send multiple queries
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
|
||||
ARRAY[:node_port, :node_port]::int[],
|
||||
ARRAY['select a from generate_series(1,1) a',
|
||||
'select a from generate_series(2,2) a']::text[],
|
||||
false);
|
||||
|
||||
-- send multiple queries, one fails
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
|
||||
ARRAY[:node_port, :node_port]::int[],
|
||||
ARRAY['select a from generate_series(1,1) a',
|
||||
'select a from generate_series(1,2) a']::text[],
|
||||
false);
|
||||
|
||||
-- send multiple queries, both fail
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
|
||||
ARRAY[:node_port, :node_port]::int[],
|
||||
ARRAY['select a from generate_series(1,2) a',
|
||||
'select a from generate_series(1,2) a']::text[],
|
||||
false);
|
||||
-- can create tables at worker
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
|
||||
ARRAY[:node_port, :node_port]::int[],
|
||||
ARRAY['create table first_table(a int, b int)',
|
||||
'create table second_table(a int, b int)']::text[],
|
||||
false);
|
||||
|
||||
-- can insert into table
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['insert into first_table select a,a from generate_series(1,20) a']::text[],
|
||||
false);
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select count(*) from first_table']::text[],
|
||||
false);
|
||||
-- insert into second table twice
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['insert into second_table select * from first_table']::text[],
|
||||
false);
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['insert into second_table select * from first_table']::text[],
|
||||
false);
|
||||
|
||||
-- check inserted values at second table
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select count(*) from second_table']::text[],
|
||||
false);
|
||||
-- store worker node name and port again
|
||||
-- previously set variables become unusable after some number of uses
|
||||
SELECT quote_literal(node_name) as node_name, node_port as node_port
|
||||
FROM master_get_active_worker_nodes()
|
||||
ORDER BY node_port
|
||||
LIMIT 1 \gset
|
||||
|
||||
-- create index on tables
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['create index first_table_index on first_table(a)']::text[],
|
||||
false);
|
||||
-- drop created tables
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['drop table first_table']::text[],
|
||||
false);
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['drop table second_table']::text[],
|
||||
false);
|
||||
|
||||
-- verify table is dropped
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select count(*) from second_table']::text[],
|
||||
false);
|
||||
--
|
||||
-- Run the same tests in parallel
|
||||
--
|
||||
|
||||
-- connect to the first worker and ask for shard count, should return 0
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select count(*) from pg_dist_shard']::text[],
|
||||
true);
|
||||
|
||||
-- connect to the first worker and ask for shards, should fail with
|
||||
-- expecting a single column error
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select * from pg_dist_shard']::text[],
|
||||
true);
|
||||
|
||||
-- query result may only contain a single row
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select a from generate_series(1,2) a']::text[],
|
||||
true);
|
||||
|
||||
-- send multiple queries
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
|
||||
ARRAY[:node_port, :node_port]::int[],
|
||||
ARRAY['select a from generate_series(1,1) a',
|
||||
'select a from generate_series(2,2) a']::text[],
|
||||
true);
|
||||
|
||||
-- send multiple queries, one fails
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
|
||||
ARRAY[:node_port, :node_port]::int[],
|
||||
ARRAY['select a from generate_series(1,1) a',
|
||||
'select a from generate_series(1,2) a']::text[],
|
||||
true);
|
||||
|
||||
-- send multiple queries, both fail
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
|
||||
ARRAY[:node_port, :node_port]::int[],
|
||||
ARRAY['select a from generate_series(1,2) a',
|
||||
'select a from generate_series(1,2) a']::text[],
|
||||
true);
|
||||
-- can create tables at worker
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[],
|
||||
ARRAY[:node_port, :node_port]::int[],
|
||||
ARRAY['create table first_table(a int, b int)',
|
||||
'create table second_table(a int, b int)']::text[],
|
||||
true);
|
||||
|
||||
-- store worker node name and port again
|
||||
-- previously set variables become unusable after some number of uses
|
||||
SELECT quote_literal(node_name) as node_name, node_port as node_port
|
||||
FROM master_get_active_worker_nodes()
|
||||
ORDER BY node_port
|
||||
LIMIT 1 \gset
|
||||
|
||||
-- can insert into table
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['insert into first_table select a,a from generate_series(1,20) a']::text[],
|
||||
true);
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select count(*) from first_table']::text[],
|
||||
true);
|
||||
-- insert into second table twice
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['insert into second_table select * from first_table']::text[],
|
||||
true);
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['insert into second_table select * from first_table']::text[],
|
||||
true);
|
||||
|
||||
-- check inserted values at second table
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select count(*) from second_table']::text[],
|
||||
true);
|
||||
|
||||
-- create index on tables
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['create index first_table_index on first_table(a)']::text[],
|
||||
true);
|
||||
-- drop created tables
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['drop table first_table']::text[],
|
||||
true);
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['drop table second_table']::text[],
|
||||
true);
|
||||
-- verify table is dropped
|
||||
SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[],
|
||||
ARRAY['select count(*) from second_table']::text[],
|
||||
true);
|
||||
|
||||
-- drop the function after use
|
||||
DROP FUNCTION master_run_on_worker(worker_name text[], port integer[], command text[],
|
||||
parallel boolean, OUT node_name text, OUT node_port integer,
|
||||
OUT success boolean, OUT result text);
|
||||
|
Loading…
Reference in New Issue