mirror of https://github.com/citusdata/citus.git
commit
28acab9d02
|
@ -253,6 +253,31 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
|||
}
|
||||
#endif
|
||||
|
||||
if (IsA(parsetree, DoStmt))
|
||||
{
|
||||
/*
|
||||
* All statements in a DO block are executed in a single transaciton,
|
||||
* so we need to keep track of whether we are inside a DO block.
|
||||
*/
|
||||
DoBlockLevel += 1;
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
standard_ProcessUtility(pstmt, queryString, context,
|
||||
params, queryEnv, dest, completionTag);
|
||||
|
||||
DoBlockLevel -= 1;
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
DoBlockLevel -= 1;
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/* process SET LOCAL stmts of whitelisted GUCs in multi-stmt xacts */
|
||||
if (IsA(parsetree, VariableSetStmt))
|
||||
{
|
||||
|
|
|
@ -87,6 +87,9 @@ bool EnableDeadlockPrevention = true;
|
|||
/* number of nested stored procedure call levels we are currently in */
|
||||
int StoredProcedureLevel = 0;
|
||||
|
||||
/* number of nested DO block levels we are currently in */
|
||||
int DoBlockLevel = 0;
|
||||
|
||||
/* sort the returning to get consistent outputs */
|
||||
bool SortReturning = false;
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "catalog/pg_type.h"
|
||||
#include "distributed/citus_nodefuncs.h"
|
||||
#include "distributed/citus_nodes.h"
|
||||
#include "distributed/function_call_delegation.h"
|
||||
#include "distributed/insert_select_planner.h"
|
||||
#include "distributed/intermediate_results.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
|
@ -192,6 +193,14 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
AdjustPartitioningForDistributedPlanning(rangeTableList,
|
||||
setPartitionedTablesInherited);
|
||||
}
|
||||
else
|
||||
{
|
||||
DistributedPlan *delegatePlan = TryToDelegateFunctionCall(parse);
|
||||
if (delegatePlan != NULL)
|
||||
{
|
||||
result = FinalizePlan(result, delegatePlan);
|
||||
}
|
||||
}
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
|
|
|
@ -0,0 +1,287 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* function_call_delegation.c
|
||||
* Planning logic for delegating a function call to a worker when the
|
||||
* function was distributed with a distribution argument and the worker
|
||||
* has metadata.
|
||||
*
|
||||
* Copyright (c), Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "catalog/pg_proc.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "distributed/citus_custom_scan.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/commands.h"
|
||||
#include "distributed/commands/multi_copy.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/function_call_delegation.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/shard_pruning.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
#include "nodes/nodeFuncs.h"
|
||||
#include "nodes/parsenodes.h"
|
||||
#include "nodes/primnodes.h"
|
||||
#include "optimizer/clauses.h"
|
||||
#include "parser/parse_coerce.h"
|
||||
#if PG_VERSION_NUM >= 120000
|
||||
#include "parser/parsetree.h"
|
||||
#endif
|
||||
#include "miscadmin.h"
|
||||
#include "tcop/dest.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/syscache.h"
|
||||
|
||||
static bool contain_param_walker(Node *node, void *context);
|
||||
|
||||
/*
|
||||
* contain_param_walker scans node for Param nodes.
|
||||
* returns whether any such nodes found.
|
||||
*/
|
||||
static bool
|
||||
contain_param_walker(Node *node, void *context)
|
||||
{
|
||||
return IsA(node, Param);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TryToDelegateFunctionCall calls a function on the worker if possible.
|
||||
* We only support delegating the SELECT func(...) form for distributed
|
||||
* functions colocated by distributed tables, and not more complicated
|
||||
* forms involving multiple function calls, FROM clauses, WHERE clauses,
|
||||
* ... Those complex forms are handled in the coordinator.
|
||||
*/
|
||||
DistributedPlan *
|
||||
TryToDelegateFunctionCall(Query *query)
|
||||
{
|
||||
FromExpr *joinTree = NULL;
|
||||
List *targetList = NIL;
|
||||
TargetEntry *targetEntry = NULL;
|
||||
FuncExpr *funcExpr = NULL;
|
||||
DistObjectCacheEntry *procedure = NULL;
|
||||
Oid colocatedRelationId = InvalidOid;
|
||||
Const *partitionValue = NULL;
|
||||
Datum partitionValueDatum = 0;
|
||||
ShardInterval *shardInterval = NULL;
|
||||
List *placementList = NIL;
|
||||
DistTableCacheEntry *distTable = NULL;
|
||||
Var *partitionColumn = NULL;
|
||||
ShardPlacement *placement = NULL;
|
||||
WorkerNode *workerNode = NULL;
|
||||
StringInfo queryString = NULL;
|
||||
Task *task = NULL;
|
||||
Job *job = NULL;
|
||||
DistributedPlan *distributedPlan = NULL;
|
||||
int32 groupId = 0;
|
||||
|
||||
if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG4))
|
||||
{
|
||||
/* Citus is not ready to determine whether function is distributed */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
groupId = GetLocalGroupId();
|
||||
if (groupId != 0 || groupId == GROUP_ID_UPGRADING)
|
||||
{
|
||||
/* do not delegate from workers, or while upgrading */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (query == NULL)
|
||||
{
|
||||
/* no query (mostly here to be defensive) */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
joinTree = query->jointree;
|
||||
if (joinTree == NULL)
|
||||
{
|
||||
/* no join tree (mostly here to be defensive) */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (joinTree->quals != NULL)
|
||||
{
|
||||
/* query has a WHERE section */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (joinTree->fromlist != NIL)
|
||||
{
|
||||
/* query has a FROM section */
|
||||
#if PG_VERSION_NUM >= 120000
|
||||
|
||||
/* in pg12 empty FROMs are represented with an RTE_RESULT */
|
||||
if (list_length(joinTree->fromlist) == 1)
|
||||
{
|
||||
RangeTblRef *reference = linitial(joinTree->fromlist);
|
||||
RangeTblEntry *rtentry = rt_fetch(reference->rtindex, query->rtable);
|
||||
if (rtentry->rtekind != RTE_RESULT)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
#else
|
||||
return NULL;
|
||||
#endif
|
||||
}
|
||||
|
||||
targetList = query->targetList;
|
||||
if (list_length(query->targetList) != 1)
|
||||
{
|
||||
/* multiple target list items */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
targetEntry = (TargetEntry *) linitial(targetList);
|
||||
if (!IsA(targetEntry->expr, FuncExpr))
|
||||
{
|
||||
/* target list item is not a function call */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
funcExpr = (FuncExpr *) targetEntry->expr;
|
||||
procedure = LookupDistObjectCacheEntry(ProcedureRelationId, funcExpr->funcid, 0);
|
||||
if (procedure == NULL)
|
||||
{
|
||||
/* not a distributed function call */
|
||||
ereport(DEBUG4, (errmsg("function is not distributed")));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (IsMultiStatementTransaction())
|
||||
{
|
||||
/* cannot delegate function calls in a multi-statement transaction */
|
||||
ereport(DEBUG1, (errmsg("not pushing down function calls in "
|
||||
"a multi-statement transaction")));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (procedure->distributionArgIndex < 0 ||
|
||||
procedure->distributionArgIndex >= list_length(funcExpr->args))
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("function call does not have a distribution argument")));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (contain_volatile_functions((Node *) funcExpr->args))
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("arguments in a distributed function must "
|
||||
"be constant expressions")));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (expression_tree_walker((Node *) funcExpr->args, contain_param_walker, NULL))
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("arguments in a distributed function must "
|
||||
"not contain subqueries")));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
colocatedRelationId = ColocatedTableId(procedure->colocationId);
|
||||
if (colocatedRelationId == InvalidOid)
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("function does not have co-located tables")));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
distTable = DistributedTableCacheEntry(colocatedRelationId);
|
||||
partitionColumn = distTable->partitionColumn;
|
||||
if (partitionColumn == NULL)
|
||||
{
|
||||
/* This can happen if colocated with a reference table. Punt for now. */
|
||||
ereport(DEBUG1, (errmsg(
|
||||
"cannnot push down function call for reference tables")));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
partitionValue = (Const *) list_nth(funcExpr->args, procedure->distributionArgIndex);
|
||||
if (!IsA(partitionValue, Const))
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("distribution argument value must be a constant")));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
partitionValueDatum = partitionValue->constvalue;
|
||||
|
||||
if (partitionValue->consttype != partitionColumn->vartype)
|
||||
{
|
||||
CopyCoercionData coercionData;
|
||||
|
||||
ConversionPathForTypes(partitionValue->consttype, partitionColumn->vartype,
|
||||
&coercionData);
|
||||
|
||||
partitionValueDatum = CoerceColumnValue(partitionValueDatum, &coercionData);
|
||||
}
|
||||
|
||||
shardInterval = FindShardInterval(partitionValueDatum, distTable);
|
||||
if (shardInterval == NULL)
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("cannot push down call, failed to find shard interval")));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
placementList = FinalizedShardPlacementList(shardInterval->shardId);
|
||||
if (list_length(placementList) != 1)
|
||||
{
|
||||
/* punt on this for now */
|
||||
ereport(DEBUG1, (errmsg(
|
||||
"cannot push down function call for replicated distributed tables")));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
placement = (ShardPlacement *) linitial(placementList);
|
||||
workerNode = FindWorkerNode(placement->nodeName, placement->nodePort);
|
||||
|
||||
if (workerNode == NULL || !workerNode->hasMetadata || !workerNode->metadataSynced)
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("the worker node does not have metadata")));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ereport(DEBUG1, (errmsg("pushing down the function call")));
|
||||
|
||||
queryString = makeStringInfo();
|
||||
pg_get_query_def(query, queryString);
|
||||
|
||||
task = CitusMakeNode(Task);
|
||||
task->taskType = SQL_TASK;
|
||||
task->queryString = queryString->data;
|
||||
task->taskPlacementList = placementList;
|
||||
task->anchorShardId = shardInterval->shardId;
|
||||
task->replicationModel = distTable->replicationModel;
|
||||
|
||||
job = CitusMakeNode(Job);
|
||||
job->jobId = UniqueJobId();
|
||||
job->jobQuery = query;
|
||||
job->taskList = list_make1(task);
|
||||
|
||||
distributedPlan = CitusMakeNode(DistributedPlan);
|
||||
distributedPlan->workerJob = job;
|
||||
distributedPlan->masterQuery = NULL;
|
||||
distributedPlan->routerExecutable = true;
|
||||
distributedPlan->hasReturning = false;
|
||||
|
||||
/* worker will take care of any necessary locking, treat query as read-only */
|
||||
distributedPlan->modLevel = ROW_MODIFY_READONLY;
|
||||
|
||||
return distributedPlan;
|
||||
}
|
|
@ -638,6 +638,11 @@ IsMultiStatementTransaction(void)
|
|||
/* in a BEGIN...END block */
|
||||
return true;
|
||||
}
|
||||
else if (DoBlockLevel > 0)
|
||||
{
|
||||
/* in (a transaction within) a do block */
|
||||
return true;
|
||||
}
|
||||
else if (StoredProcedureLevel > 0)
|
||||
{
|
||||
/* in (a transaction within) a stored procedure */
|
||||
|
|
|
@ -3143,18 +3143,22 @@ GetLocalGroupId(void)
|
|||
tupleDescriptor, &isNull);
|
||||
|
||||
groupId = DatumGetInt32(groupIdDatum);
|
||||
|
||||
/* set the local cache variable */
|
||||
LocalGroupId = groupId;
|
||||
}
|
||||
else
|
||||
{
|
||||
elog(ERROR, "could not find any entries in pg_dist_local_group");
|
||||
/*
|
||||
* Upgrade is happening. When upgrading postgres, pg_dist_local_group is
|
||||
* temporarily empty before citus_finish_pg_upgrade() finishes execution.
|
||||
*/
|
||||
groupId = GROUP_ID_UPGRADING;
|
||||
}
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
heap_close(pgDistLocalGroupId, AccessShareLock);
|
||||
|
||||
/* set the local cache variable */
|
||||
LocalGroupId = groupId;
|
||||
|
||||
return groupId;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
/*
|
||||
* function_call_delegation.h
|
||||
* Declarations for public functions and variables used to delegate
|
||||
* function calls to worker nodes.
|
||||
*
|
||||
* Copyright (c), Citus Data, Inc.
|
||||
*/
|
||||
|
||||
#ifndef FUNCTION_CALL_DELEGATION_H
|
||||
#define FUNCTION_CALL_DELEGATION_H
|
||||
|
||||
#include "postgres.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
|
||||
|
||||
DistributedPlan * TryToDelegateFunctionCall(Query *query);
|
||||
|
||||
|
||||
#endif /* FUNCTION_CALL_DELEGATION_H */
|
|
@ -29,6 +29,14 @@ typedef enum
|
|||
} ReadFromSecondariesType;
|
||||
extern int ReadFromSecondaries;
|
||||
|
||||
|
||||
/*
|
||||
* While upgrading pg_dist_local_group can be empty temporarily, in that
|
||||
* case we use GROUP_ID_UPGRADING as the local group id to communicate
|
||||
* this to other functions.
|
||||
*/
|
||||
#define GROUP_ID_UPGRADING -2
|
||||
|
||||
/*
|
||||
* Representation of a table's metadata that is frequently used for
|
||||
* distributed execution. Cached.
|
||||
|
|
|
@ -91,6 +91,9 @@ extern dlist_head InProgressTransactions;
|
|||
/* number of nested stored procedure call levels we are currently in */
|
||||
extern int StoredProcedureLevel;
|
||||
|
||||
/* number of nested DO block levels we are currently in */
|
||||
extern int DoBlockLevel;
|
||||
|
||||
/* number of nested function call levels we are currently in */
|
||||
extern int FunctionCallLevel;
|
||||
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
-- Test passing off CALL to mx workers
|
||||
create schema multi_mx_call;
|
||||
set search_path to multi_mx_call, public;
|
||||
-- Create worker-local tables to test procedure calls were routed
|
||||
set citus.shard_replication_factor to 2;
|
||||
set citus.replication_model to 'statement';
|
||||
|
@ -13,21 +15,6 @@ select create_distributed_table('mx_call_dist_table_replica', 'id');
|
|||
insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5);
|
||||
set citus.shard_replication_factor to 1;
|
||||
set citus.replication_model to 'streaming';
|
||||
create schema multi_mx_call;
|
||||
set search_path to multi_mx_call, public;
|
||||
--
|
||||
-- Utility UDFs
|
||||
--
|
||||
-- 1. Marks the given procedure as colocated with the given table.
|
||||
-- 2. Marks the argument index with which we route the procedure.
|
||||
CREATE PROCEDURE colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
|
||||
LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
|
||||
END;$$;
|
||||
--
|
||||
-- Create tables and procedures we want to use in tests
|
||||
--
|
||||
|
@ -96,6 +83,19 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
|||
F | S
|
||||
(1 row)
|
||||
|
||||
-- Same for unqualified names
|
||||
call mx_call_proc(2, 0);
|
||||
y
|
||||
----
|
||||
29
|
||||
(1 row)
|
||||
|
||||
call mx_call_proc_custom_types('S', 'A');
|
||||
x | y
|
||||
---+---
|
||||
F | S
|
||||
(1 row)
|
||||
|
||||
-- Mark both procedures as distributed ...
|
||||
select create_distributed_function('mx_call_proc(int,int)');
|
||||
create_distributed_function
|
||||
|
@ -114,10 +114,10 @@ select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_ca
|
|||
SET client_min_messages TO DEBUG1;
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
DEBUG: stored procedure does not have co-located tables
|
||||
DEBUG: generating subplan 8_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
DEBUG: generating subplan 10_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 8 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
y
|
||||
|
@ -133,8 +133,18 @@ DEBUG: stored procedure does not have co-located tables
|
|||
(1 row)
|
||||
|
||||
-- Mark them as colocated with a table. Now we should route them to workers.
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
|
||||
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
DEBUG: pushing down the procedure
|
||||
y
|
||||
|
@ -149,14 +159,28 @@ DEBUG: pushing down the procedure
|
|||
S | S
|
||||
(1 row)
|
||||
|
||||
call mx_call_proc(2, 0);
|
||||
DEBUG: pushing down the procedure
|
||||
y
|
||||
----
|
||||
28
|
||||
(1 row)
|
||||
|
||||
call mx_call_proc_custom_types('S', 'A');
|
||||
DEBUG: pushing down the procedure
|
||||
x | y
|
||||
---+---
|
||||
S | S
|
||||
(1 row)
|
||||
|
||||
-- We don't allow distributing calls inside transactions
|
||||
begin;
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
DEBUG: cannot push down CALL in multi-statement transaction
|
||||
DEBUG: generating subplan 10_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
DEBUG: generating subplan 12_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
DEBUG: Plan 12 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
y
|
||||
|
@ -179,21 +203,12 @@ DEBUG: stored procedure does not have co-located tables
|
|||
|
||||
-- Make sure we do bounds checking on distributed argument index
|
||||
-- This also tests that we have cache invalidation for pg_dist_object updates
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
DEBUG: cannot push down invalid distribution_argument_index
|
||||
DEBUG: generating subplan 12_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 12 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
y
|
||||
----
|
||||
29
|
||||
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
DEBUG: cannot push down invalid distribution_argument_index
|
||||
DEBUG: generating subplan 14_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
|
@ -207,10 +222,14 @@ PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
|||
29
|
||||
(1 row)
|
||||
|
||||
-- We don't currently support colocating with reference tables
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1);
|
||||
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
DEBUG: cannot push down CALL for reference tables
|
||||
DEBUG: cannot push down invalid distribution_argument_index
|
||||
DEBUG: generating subplan 17_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
|
@ -222,10 +241,15 @@ PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
|||
29
|
||||
(1 row)
|
||||
|
||||
-- We don't currently support colocating with replicated tables
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1);
|
||||
-- We don't currently support colocating with reference tables
|
||||
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
DEBUG: cannot push down CALL for replicated distributed tables
|
||||
DEBUG: cannot push down CALL for reference tables
|
||||
DEBUG: generating subplan 19_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
|
@ -237,20 +261,45 @@ PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
|||
29
|
||||
(1 row)
|
||||
|
||||
-- We don't currently support colocating with replicated tables
|
||||
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
DEBUG: cannot push down CALL for replicated distributed tables
|
||||
DEBUG: generating subplan 21_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 21 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('21_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
y
|
||||
----
|
||||
29
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO NOTICE;
|
||||
drop table mx_call_dist_table_replica;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
|
||||
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Test that we handle transactional constructs correctly inside a procedure
|
||||
-- that is routed to the workers.
|
||||
CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
INSERT INTO mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
|
||||
INSERT INTO multi_mx_call.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
|
||||
COMMIT;
|
||||
UPDATE mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
|
||||
UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
|
||||
ROLLBACK;
|
||||
-- Now do the final update!
|
||||
UPDATE mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
|
||||
UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
|
||||
END;$$;
|
||||
-- before distribution ...
|
||||
CALL multi_mx_call.mx_call_proc_tx(10);
|
||||
|
@ -265,9 +314,6 @@ DETAIL: A distributed function is created. To make sure subsequent commands see
|
|||
|
||||
CALL multi_mx_call.mx_call_proc_tx(20);
|
||||
DEBUG: pushing down the procedure
|
||||
ERROR: relation "mx_call_dist_table_1" does not exist
|
||||
CONTEXT: while executing command on localhost:57637
|
||||
PL/pgSQL function multi_mx_call.mx_call_proc_tx(integer) line 3 at SQL statement
|
||||
SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val;
|
||||
id | val
|
||||
----+-----
|
||||
|
@ -278,7 +324,9 @@ SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val;
|
|||
9 | 2
|
||||
10 | -2
|
||||
11 | 3
|
||||
(7 rows)
|
||||
20 | -2
|
||||
21 | 3
|
||||
(9 rows)
|
||||
|
||||
-- Test that we properly propagate errors raised from procedures.
|
||||
CREATE PROCEDURE mx_call_proc_raise(x int) LANGUAGE plpgsql AS $$
|
||||
|
@ -316,10 +364,10 @@ select stop_metadata_sync_to_node('localhost', :worker_2_port);
|
|||
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
DEBUG: there is no worker node with metadata
|
||||
DEBUG: generating subplan 25_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
DEBUG: generating subplan 27_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 25 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('25_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
DEBUG: Plan 27 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('27_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
y
|
||||
|
@ -340,6 +388,11 @@ select start_metadata_sync_to_node('localhost', :worker_2_port);
|
|||
|
||||
(1 row)
|
||||
|
||||
-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make
|
||||
-- worker backend caches inconsistent. Reconnect to coordinator to use
|
||||
-- new worker connections, hence new backends.
|
||||
\c - - - :master_port
|
||||
SET search_path to multi_mx_call, public;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
--
|
||||
-- Test non-const parameter values
|
||||
|
@ -357,10 +410,10 @@ DETAIL: A distributed function is created. To make sure subsequent commands see
|
|||
-- non-const distribution parameters cannot be pushed down
|
||||
call multi_mx_call.mx_call_proc(2, mx_call_add(3, 4));
|
||||
DEBUG: distribution argument value must be a constant
|
||||
DEBUG: generating subplan 27_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
DEBUG: generating subplan 1_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 27 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('27_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
DEBUG: Plan 1 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('1_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
y
|
||||
|
@ -379,10 +432,10 @@ DEBUG: pushing down the procedure
|
|||
-- volatile parameter cannot be pushed down
|
||||
call multi_mx_call.mx_call_proc(floor(random())::int, 2);
|
||||
DEBUG: arguments in a distributed stored procedure must be constant expressions
|
||||
DEBUG: generating subplan 29_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
DEBUG: generating subplan 3_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 29 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('29_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
DEBUG: Plan 3 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('3_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
||||
y
|
||||
|
@ -393,4 +446,4 @@ PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
|
|||
reset client_min_messages;
|
||||
\set VERBOSITY terse
|
||||
drop schema multi_mx_call cascade;
|
||||
NOTICE: drop cascades to 10 other objects
|
||||
NOTICE: drop cascades to 9 other objects
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
-- Test passing off CALL to mx workers
|
||||
create schema multi_mx_call;
|
||||
set search_path to multi_mx_call, public;
|
||||
-- Create worker-local tables to test procedure calls were routed
|
||||
set citus.shard_replication_factor to 2;
|
||||
set citus.replication_model to 'statement';
|
||||
|
@ -13,24 +15,6 @@ select create_distributed_table('mx_call_dist_table_replica', 'id');
|
|||
insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5);
|
||||
set citus.shard_replication_factor to 1;
|
||||
set citus.replication_model to 'streaming';
|
||||
create schema multi_mx_call;
|
||||
set search_path to multi_mx_call, public;
|
||||
--
|
||||
-- Utility UDFs
|
||||
--
|
||||
-- 1. Marks the given procedure as colocated with the given table.
|
||||
-- 2. Marks the argument index with which we route the procedure.
|
||||
CREATE PROCEDURE colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
|
||||
LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
|
||||
END;$$;
|
||||
ERROR: syntax error at or near "PROCEDURE"
|
||||
LINE 1: CREATE PROCEDURE colocate_proc_with_table(procname text, tab...
|
||||
^
|
||||
--
|
||||
-- Create tables and procedures we want to use in tests
|
||||
--
|
||||
|
@ -101,6 +85,15 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
|||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
^
|
||||
-- Same for unqualified names
|
||||
call mx_call_proc(2, 0);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call mx_call_proc(2, 0);
|
||||
^
|
||||
call mx_call_proc_custom_types('S', 'A');
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call mx_call_proc_custom_types('S', 'A');
|
||||
^
|
||||
-- Mark both procedures as distributed ...
|
||||
select create_distributed_function('mx_call_proc(int,int)');
|
||||
ERROR: function "mx_call_proc(int,int)" does not exist
|
||||
|
@ -122,14 +115,18 @@ ERROR: syntax error at or near "call"
|
|||
LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
^
|
||||
-- Mark them as colocated with a table. Now we should route them to workers.
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ...
|
||||
^
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc_cu...
|
||||
^
|
||||
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.mx_call_proc(2, 0);
|
||||
|
@ -138,6 +135,14 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
|||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
^
|
||||
call mx_call_proc(2, 0);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call mx_call_proc(2, 0);
|
||||
^
|
||||
call mx_call_proc_custom_types('S', 'A');
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call mx_call_proc_custom_types('S', 'A');
|
||||
^
|
||||
-- We don't allow distributing calls inside transactions
|
||||
begin;
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
|
@ -156,36 +161,44 @@ LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
|||
^
|
||||
-- Make sure we do bounds checking on distributed argument index
|
||||
-- This also tests that we have cache invalidation for pg_dist_object updates
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ...
|
||||
^
|
||||
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.mx_call_proc(2, 0);
|
||||
^
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ...
|
||||
^
|
||||
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.mx_call_proc(2, 0);
|
||||
^
|
||||
-- We don't currently support colocating with reference tables
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ...
|
||||
^
|
||||
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.mx_call_proc(2, 0);
|
||||
^
|
||||
-- We don't currently support colocating with replicated tables
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ...
|
||||
^
|
||||
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.mx_call_proc(2, 0);
|
||||
|
@ -193,20 +206,22 @@ LINE 1: call multi_mx_call.mx_call_proc(2, 0);
|
|||
SET client_min_messages TO NOTICE;
|
||||
drop table mx_call_dist_table_replica;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
|
||||
ERROR: syntax error at or near "call"
|
||||
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ...
|
||||
^
|
||||
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Test that we handle transactional constructs correctly inside a procedure
|
||||
-- that is routed to the workers.
|
||||
CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
INSERT INTO mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
|
||||
INSERT INTO multi_mx_call.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
|
||||
COMMIT;
|
||||
UPDATE mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
|
||||
UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
|
||||
ROLLBACK;
|
||||
-- Now do the final update!
|
||||
UPDATE mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
|
||||
UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
|
||||
END;$$;
|
||||
ERROR: syntax error at or near "PROCEDURE"
|
||||
LINE 1: CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS ...
|
||||
|
@ -282,6 +297,11 @@ select start_metadata_sync_to_node('localhost', :worker_2_port);
|
|||
|
||||
(1 row)
|
||||
|
||||
-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make
|
||||
-- worker backend caches inconsistent. Reconnect to coordinator to use
|
||||
-- new worker connections, hence new backends.
|
||||
\c - - - :master_port
|
||||
SET search_path to multi_mx_call, public;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
--
|
||||
-- Test non-const parameter values
|
||||
|
|
|
@ -0,0 +1,539 @@
|
|||
-- Test passing off function call to mx workers
|
||||
CREATE SCHEMA multi_mx_function_call_delegation;
|
||||
SET search_path TO multi_mx_function_call_delegation, public;
|
||||
SET citus.shard_replication_factor TO 2;
|
||||
SET citus.replication_model TO 'statement';
|
||||
-- This table requires specific settings, create before getting into things
|
||||
create table mx_call_dist_table_replica(id int, val int);
|
||||
select create_distributed_table('mx_call_dist_table_replica', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5);
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
--
|
||||
-- Create tables and functions we want to use in tests
|
||||
--
|
||||
create table mx_call_dist_table_1(id int, val int);
|
||||
select create_distributed_table('mx_call_dist_table_1', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
insert into mx_call_dist_table_1 values (3,1),(4,5),(9,2),(6,5),(3,5);
|
||||
create table mx_call_dist_table_2(id int, val int);
|
||||
select create_distributed_table('mx_call_dist_table_2', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
insert into mx_call_dist_table_2 values (1,1),(1,2),(2,2),(3,3),(3,4);
|
||||
create table mx_call_dist_table_ref(id int, val int);
|
||||
select create_reference_table('mx_call_dist_table_ref');
|
||||
create_reference_table
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
insert into mx_call_dist_table_ref values (2,7),(1,8),(2,8),(1,8),(2,8);
|
||||
create type mx_call_enum as enum ('A', 'S', 'D', 'F');
|
||||
create table mx_call_dist_table_enum(id int, key mx_call_enum);
|
||||
select create_distributed_table('mx_call_dist_table_enum', 'key');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F');
|
||||
CREATE FUNCTION squares(int) RETURNS SETOF RECORD
|
||||
AS $$ SELECT i, i * i FROM generate_series(1, $1) i $$
|
||||
LANGUAGE SQL;
|
||||
CREATE FUNCTION mx_call_func(x int, INOUT y int)
|
||||
LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
-- groupid is 0 in coordinator and non-zero in workers, so by using it here
|
||||
-- we make sure the function is being executed in the worker.
|
||||
y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group);
|
||||
-- we also make sure that we can run distributed queries in the functions
|
||||
-- that are routed to the workers.
|
||||
y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id);
|
||||
END;$$;
|
||||
-- create another function which verifies:
|
||||
-- 1. we work fine with multiple return columns
|
||||
-- 2. we work fine in combination with custom types
|
||||
CREATE FUNCTION mx_call_func_custom_types(INOUT x mx_call_enum, INOUT y mx_call_enum)
|
||||
LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
y := x;
|
||||
x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
|
||||
END;$$;
|
||||
-- Test that undistributed functions have no issue executing
|
||||
select multi_mx_function_call_delegation.mx_call_func(2, 0);
|
||||
mx_call_func
|
||||
--------------
|
||||
29
|
||||
(1 row)
|
||||
|
||||
select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
|
||||
mx_call_func_custom_types
|
||||
---------------------------
|
||||
(F,S)
|
||||
(1 row)
|
||||
|
||||
select squares(4);
|
||||
squares
|
||||
---------
|
||||
(1,1)
|
||||
(2,4)
|
||||
(3,9)
|
||||
(4,16)
|
||||
(4 rows)
|
||||
|
||||
-- Same for unqualified name
|
||||
select mx_call_func(2, 0);
|
||||
mx_call_func
|
||||
--------------
|
||||
29
|
||||
(1 row)
|
||||
|
||||
-- Mark both functions as distributed ...
|
||||
select create_distributed_function('mx_call_func(int,int)');
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_call_enum)');
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select create_distributed_function('squares(int)');
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- We still don't route them to the workers, because they aren't
|
||||
-- colocated with any distributed tables.
|
||||
SET client_min_messages TO DEBUG1;
|
||||
select mx_call_func(2, 0);
|
||||
DEBUG: function does not have co-located tables
|
||||
DEBUG: generating subplan 10_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
mx_call_func
|
||||
--------------
|
||||
29
|
||||
(1 row)
|
||||
|
||||
select mx_call_func_custom_types('S', 'A');
|
||||
DEBUG: function does not have co-located tables
|
||||
mx_call_func_custom_types
|
||||
---------------------------
|
||||
(F,S)
|
||||
(1 row)
|
||||
|
||||
-- Mark them as colocated with a table. Now we should route them to workers.
|
||||
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select colocate_proc_with_table('mx_call_func_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select mx_call_func(2, 0);
|
||||
DEBUG: pushing down the function call
|
||||
mx_call_func
|
||||
--------------
|
||||
28
|
||||
(1 row)
|
||||
|
||||
select mx_call_func_custom_types('S', 'A');
|
||||
DEBUG: pushing down the function call
|
||||
mx_call_func_custom_types
|
||||
---------------------------
|
||||
(S,S)
|
||||
(1 row)
|
||||
|
||||
select squares(4);
|
||||
DEBUG: pushing down the function call
|
||||
ERROR: input of anonymous composite types is not implemented
|
||||
select multi_mx_function_call_delegation.mx_call_func(2, 0);
|
||||
DEBUG: pushing down the function call
|
||||
mx_call_func
|
||||
--------------
|
||||
28
|
||||
(1 row)
|
||||
|
||||
select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
|
||||
DEBUG: pushing down the function call
|
||||
mx_call_func_custom_types
|
||||
---------------------------
|
||||
(S,S)
|
||||
(1 row)
|
||||
|
||||
-- We don't allow distributing calls inside transactions
|
||||
begin;
|
||||
select mx_call_func(2, 0);
|
||||
DEBUG: not pushing down function calls in a multi-statement transaction
|
||||
DEBUG: generating subplan 12_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 12 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
mx_call_func
|
||||
--------------
|
||||
29
|
||||
(1 row)
|
||||
|
||||
commit;
|
||||
-- Drop the table colocated with mx_call_func_custom_types. Now it shouldn't
|
||||
-- be routed to workers anymore.
|
||||
SET client_min_messages TO NOTICE;
|
||||
drop table mx_call_dist_table_enum;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
select mx_call_func_custom_types('S', 'A');
|
||||
DEBUG: function does not have co-located tables
|
||||
mx_call_func_custom_types
|
||||
---------------------------
|
||||
(F,S)
|
||||
(1 row)
|
||||
|
||||
-- Make sure we do bounds checking on distributed argument index
|
||||
-- This also tests that we have cache invalidation for pg_dist_object updates
|
||||
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, -1);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select mx_call_func(2, 0);
|
||||
DEBUG: function call does not have a distribution argument
|
||||
DEBUG: generating subplan 14_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 14 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('14_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
mx_call_func
|
||||
--------------
|
||||
29
|
||||
(1 row)
|
||||
|
||||
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 2);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select mx_call_func(2, 0);
|
||||
DEBUG: function call does not have a distribution argument
|
||||
DEBUG: generating subplan 17_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 17 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('17_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
mx_call_func
|
||||
--------------
|
||||
29
|
||||
(1 row)
|
||||
|
||||
-- We don't currently support colocating with reference tables
|
||||
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_ref'::regclass, 1);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select mx_call_func(2, 0);
|
||||
DEBUG: cannnot push down function call for reference tables
|
||||
DEBUG: generating subplan 19_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 19 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('19_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
mx_call_func
|
||||
--------------
|
||||
29
|
||||
(1 row)
|
||||
|
||||
-- We don't currently support colocating with replicated tables
|
||||
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_replica'::regclass, 1);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select mx_call_func(2, 0);
|
||||
DEBUG: cannot push down function call for replicated distributed tables
|
||||
DEBUG: generating subplan 21_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 21 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('21_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
mx_call_func
|
||||
--------------
|
||||
29
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO NOTICE;
|
||||
drop table mx_call_dist_table_replica;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1);
|
||||
colocate_proc_with_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Test table returning functions.
|
||||
CREATE FUNCTION mx_call_func_tbl(x int)
|
||||
RETURNS TABLE (p0 int, p1 int)
|
||||
LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
|
||||
UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
|
||||
UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
|
||||
RETURN QUERY
|
||||
SELECT id, val
|
||||
FROM multi_mx_function_call_delegation.mx_call_dist_table_1 t
|
||||
WHERE id >= x
|
||||
ORDER BY 1, 2;
|
||||
END;$$;
|
||||
-- before distribution ...
|
||||
select mx_call_func_tbl(10);
|
||||
mx_call_func_tbl
|
||||
------------------
|
||||
(10,-1)
|
||||
(11,4)
|
||||
(2 rows)
|
||||
|
||||
-- after distribution ...
|
||||
select create_distributed_function('mx_call_func_tbl(int)', '$1', 'mx_call_dist_table_1');
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select mx_call_func_tbl(20);
|
||||
DEBUG: pushing down the function call
|
||||
mx_call_func_tbl
|
||||
------------------
|
||||
(20,-1)
|
||||
(21,4)
|
||||
(2 rows)
|
||||
|
||||
-- Test that we properly propagate errors raised from procedures.
|
||||
CREATE FUNCTION mx_call_func_raise(x int)
|
||||
RETURNS void LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
RAISE WARNING 'warning';
|
||||
RAISE EXCEPTION 'error';
|
||||
END;$$;
|
||||
select create_distributed_function('mx_call_func_raise(int)', '$1', 'mx_call_dist_table_1');
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select mx_call_func_raise(2);
|
||||
DEBUG: pushing down the function call
|
||||
DEBUG: warning
|
||||
DETAIL: WARNING from localhost:57638
|
||||
ERROR: error
|
||||
CONTEXT: while executing command on localhost:57638
|
||||
PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_raise(integer) line 4 at RAISE
|
||||
-- Test that we don't propagate to non-metadata worker nodes
|
||||
select stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
stop_metadata_sync_to_node
|
||||
----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select stop_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
stop_metadata_sync_to_node
|
||||
----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select mx_call_func(2, 0);
|
||||
DEBUG: the worker node does not have metadata
|
||||
DEBUG: generating subplan 27_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 27 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('27_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
mx_call_func
|
||||
--------------
|
||||
29
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO NOTICE;
|
||||
select start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
start_metadata_sync_to_node
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
start_metadata_sync_to_node
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make
|
||||
-- worker backend caches inconsistent. Reconnect to coordinator to use
|
||||
-- new worker connections, hence new backends.
|
||||
\c - - - :master_port
|
||||
SET search_path to multi_mx_function_call_delegation, public;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
--
|
||||
-- Test non-const parameter values
|
||||
--
|
||||
CREATE FUNCTION mx_call_add(int, int) RETURNS int
|
||||
AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE;
|
||||
SELECT create_distributed_function('mx_call_add(int,int)', '$1');
|
||||
DEBUG: switching to sequential query execution mode
|
||||
DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
|
||||
create_distributed_function
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- subquery parameters cannot be pushed down
|
||||
select mx_call_func((select x + 1 from mx_call_add(3, 4) x), 2);
|
||||
DEBUG: arguments in a distributed function must not contain subqueries
|
||||
DEBUG: generating subplan 1_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 1 query after replacing subqueries and CTEs: SELECT (9 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('1_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
mx_call_func
|
||||
--------------
|
||||
35
|
||||
(1 row)
|
||||
|
||||
-- volatile parameter cannot be pushed down
|
||||
select mx_call_func(floor(random())::int, 2);
|
||||
DEBUG: arguments in a distributed function must be constant expressions
|
||||
DEBUG: generating subplan 3_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 3 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('3_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
mx_call_func
|
||||
--------------
|
||||
27
|
||||
(1 row)
|
||||
|
||||
-- test forms we don't distribute
|
||||
select * from mx_call_func(2, 0);
|
||||
DEBUG: generating subplan 5_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 5 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('5_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
y
|
||||
----
|
||||
29
|
||||
(1 row)
|
||||
|
||||
select mx_call_func(2, 0) from mx_call_dist_table_1;
|
||||
mx_call_func
|
||||
--------------
|
||||
28
|
||||
28
|
||||
28
|
||||
28
|
||||
28
|
||||
28
|
||||
28
|
||||
28
|
||||
28
|
||||
(9 rows)
|
||||
|
||||
select mx_call_func(2, 0) where mx_call_func(0, 2) = 0;
|
||||
DEBUG: generating subplan 8_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 8 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
mx_call_func
|
||||
--------------
|
||||
(0 rows)
|
||||
|
||||
select mx_call_func(2, 0), mx_call_func(0, 2);
|
||||
DEBUG: generating subplan 10_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
DEBUG: generating subplan 13_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
DEBUG: Plan 13 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('13_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
|
||||
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
|
||||
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
|
||||
mx_call_func | mx_call_func
|
||||
--------------+--------------
|
||||
29 | 27
|
||||
(1 row)
|
||||
|
||||
DO $$ BEGIN perform mx_call_func_tbl(40); END; $$;
|
||||
DEBUG: not pushing down function calls in a multi-statement transaction
|
||||
CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)"
|
||||
PL/pgSQL function inline_code_block line 1 at PERFORM
|
||||
SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val;
|
||||
id | val
|
||||
----+-----
|
||||
40 | -1
|
||||
41 | 4
|
||||
(2 rows)
|
||||
|
||||
-- Prepared statements
|
||||
PREPARE call_plan (int, int) AS SELECT mx_call_func($1, $2);
|
||||
EXECUTE call_plan(2, 0);
|
||||
DEBUG: pushing down the function call
|
||||
mx_call_func
|
||||
--------------
|
||||
28
|
||||
(1 row)
|
||||
|
||||
RESET client_min_messages;
|
||||
\set VERBOSITY terse
|
||||
DROP SCHEMA multi_mx_function_call_delegation CASCADE;
|
||||
NOTICE: drop cascades to 10 other objects
|
|
@ -140,3 +140,13 @@ BEGIN
|
|||
EXECUTE p_sql;
|
||||
PERFORM run_command_on_workers(p_sql);
|
||||
END;$$;
|
||||
-- 1. Marks the given procedure as colocated with the given table.
|
||||
-- 2. Marks the argument index with which we route the procedure.
|
||||
CREATE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
|
||||
RETURNS void LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
|
||||
END;$$;
|
||||
|
|
|
@ -33,6 +33,7 @@ test: multi_mx_repartition_udt_prepare mx_foreign_key_to_reference_table
|
|||
test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
|
||||
test: multi_mx_metadata
|
||||
test: multi_mx_call
|
||||
test: multi_mx_function_call_delegation
|
||||
test: multi_mx_modifications local_shard_execution
|
||||
test: multi_mx_transaction_recovery
|
||||
test: multi_mx_modifying_xacts
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
-- Test passing off CALL to mx workers
|
||||
|
||||
create schema multi_mx_call;
|
||||
set search_path to multi_mx_call, public;
|
||||
|
||||
-- Create worker-local tables to test procedure calls were routed
|
||||
|
||||
set citus.shard_replication_factor to 2;
|
||||
|
@ -13,25 +16,6 @@ insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5);
|
|||
set citus.shard_replication_factor to 1;
|
||||
set citus.replication_model to 'streaming';
|
||||
|
||||
create schema multi_mx_call;
|
||||
set search_path to multi_mx_call, public;
|
||||
|
||||
--
|
||||
-- Utility UDFs
|
||||
--
|
||||
|
||||
-- 1. Marks the given procedure as colocated with the given table.
|
||||
-- 2. Marks the argument index with which we route the procedure.
|
||||
CREATE PROCEDURE colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
|
||||
LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
|
||||
END;$$;
|
||||
|
||||
|
||||
--
|
||||
-- Create tables and procedures we want to use in tests
|
||||
--
|
||||
|
@ -78,6 +62,10 @@ END;$$;
|
|||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
|
||||
-- Same for unqualified names
|
||||
call mx_call_proc(2, 0);
|
||||
call mx_call_proc_custom_types('S', 'A');
|
||||
|
||||
-- Mark both procedures as distributed ...
|
||||
select create_distributed_function('mx_call_proc(int,int)');
|
||||
select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_call_enum)');
|
||||
|
@ -89,10 +77,13 @@ call multi_mx_call.mx_call_proc(2, 0);
|
|||
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
|
||||
-- Mark them as colocated with a table. Now we should route them to workers.
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
|
||||
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
|
||||
select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
|
||||
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
||||
call mx_call_proc(2, 0);
|
||||
call mx_call_proc_custom_types('S', 'A');
|
||||
|
||||
-- We don't allow distributing calls inside transactions
|
||||
begin;
|
||||
|
@ -108,34 +99,34 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A');
|
|||
|
||||
-- Make sure we do bounds checking on distributed argument index
|
||||
-- This also tests that we have cache invalidation for pg_dist_object updates
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1);
|
||||
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2);
|
||||
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
|
||||
-- We don't currently support colocating with reference tables
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1);
|
||||
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
|
||||
-- We don't currently support colocating with replicated tables
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1);
|
||||
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1);
|
||||
call multi_mx_call.mx_call_proc(2, 0);
|
||||
SET client_min_messages TO NOTICE;
|
||||
drop table mx_call_dist_table_replica;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
|
||||
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
|
||||
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
|
||||
|
||||
-- Test that we handle transactional constructs correctly inside a procedure
|
||||
-- that is routed to the workers.
|
||||
CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
INSERT INTO mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
|
||||
INSERT INTO multi_mx_call.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
|
||||
COMMIT;
|
||||
UPDATE mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
|
||||
UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
|
||||
ROLLBACK;
|
||||
-- Now do the final update!
|
||||
UPDATE mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
|
||||
UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
|
||||
END;$$;
|
||||
|
||||
-- before distribution ...
|
||||
|
@ -162,6 +153,12 @@ call multi_mx_call.mx_call_proc(2, 0);
|
|||
SET client_min_messages TO NOTICE;
|
||||
select start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
select start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
|
||||
-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make
|
||||
-- worker backend caches inconsistent. Reconnect to coordinator to use
|
||||
-- new worker connections, hence new backends.
|
||||
\c - - - :master_port
|
||||
SET search_path to multi_mx_call, public;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
|
||||
--
|
||||
|
|
|
@ -0,0 +1,198 @@
|
|||
-- Test passing off function call to mx workers
|
||||
|
||||
CREATE SCHEMA multi_mx_function_call_delegation;
|
||||
SET search_path TO multi_mx_function_call_delegation, public;
|
||||
|
||||
SET citus.shard_replication_factor TO 2;
|
||||
SET citus.replication_model TO 'statement';
|
||||
|
||||
-- This table requires specific settings, create before getting into things
|
||||
create table mx_call_dist_table_replica(id int, val int);
|
||||
select create_distributed_table('mx_call_dist_table_replica', 'id');
|
||||
insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5);
|
||||
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
|
||||
--
|
||||
-- Create tables and functions we want to use in tests
|
||||
--
|
||||
create table mx_call_dist_table_1(id int, val int);
|
||||
select create_distributed_table('mx_call_dist_table_1', 'id');
|
||||
insert into mx_call_dist_table_1 values (3,1),(4,5),(9,2),(6,5),(3,5);
|
||||
|
||||
create table mx_call_dist_table_2(id int, val int);
|
||||
select create_distributed_table('mx_call_dist_table_2', 'id');
|
||||
insert into mx_call_dist_table_2 values (1,1),(1,2),(2,2),(3,3),(3,4);
|
||||
|
||||
create table mx_call_dist_table_ref(id int, val int);
|
||||
select create_reference_table('mx_call_dist_table_ref');
|
||||
insert into mx_call_dist_table_ref values (2,7),(1,8),(2,8),(1,8),(2,8);
|
||||
|
||||
create type mx_call_enum as enum ('A', 'S', 'D', 'F');
|
||||
create table mx_call_dist_table_enum(id int, key mx_call_enum);
|
||||
select create_distributed_table('mx_call_dist_table_enum', 'key');
|
||||
insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F');
|
||||
|
||||
CREATE FUNCTION squares(int) RETURNS SETOF RECORD
|
||||
AS $$ SELECT i, i * i FROM generate_series(1, $1) i $$
|
||||
LANGUAGE SQL;
|
||||
|
||||
CREATE FUNCTION mx_call_func(x int, INOUT y int)
|
||||
LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
-- groupid is 0 in coordinator and non-zero in workers, so by using it here
|
||||
-- we make sure the function is being executed in the worker.
|
||||
y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group);
|
||||
-- we also make sure that we can run distributed queries in the functions
|
||||
-- that are routed to the workers.
|
||||
y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id);
|
||||
END;$$;
|
||||
|
||||
-- create another function which verifies:
|
||||
-- 1. we work fine with multiple return columns
|
||||
-- 2. we work fine in combination with custom types
|
||||
CREATE FUNCTION mx_call_func_custom_types(INOUT x mx_call_enum, INOUT y mx_call_enum)
|
||||
LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
y := x;
|
||||
x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
|
||||
END;$$;
|
||||
|
||||
-- Test that undistributed functions have no issue executing
|
||||
select multi_mx_function_call_delegation.mx_call_func(2, 0);
|
||||
select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
|
||||
select squares(4);
|
||||
|
||||
-- Same for unqualified name
|
||||
select mx_call_func(2, 0);
|
||||
|
||||
-- Mark both functions as distributed ...
|
||||
select create_distributed_function('mx_call_func(int,int)');
|
||||
select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_call_enum)');
|
||||
select create_distributed_function('squares(int)');
|
||||
|
||||
-- We still don't route them to the workers, because they aren't
|
||||
-- colocated with any distributed tables.
|
||||
SET client_min_messages TO DEBUG1;
|
||||
select mx_call_func(2, 0);
|
||||
select mx_call_func_custom_types('S', 'A');
|
||||
|
||||
-- Mark them as colocated with a table. Now we should route them to workers.
|
||||
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1);
|
||||
select colocate_proc_with_table('mx_call_func_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
|
||||
select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0);
|
||||
|
||||
select mx_call_func(2, 0);
|
||||
select mx_call_func_custom_types('S', 'A');
|
||||
select squares(4);
|
||||
select multi_mx_function_call_delegation.mx_call_func(2, 0);
|
||||
select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
|
||||
|
||||
-- We don't allow distributing calls inside transactions
|
||||
begin;
|
||||
select mx_call_func(2, 0);
|
||||
commit;
|
||||
|
||||
-- Drop the table colocated with mx_call_func_custom_types. Now it shouldn't
|
||||
-- be routed to workers anymore.
|
||||
SET client_min_messages TO NOTICE;
|
||||
drop table mx_call_dist_table_enum;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
select mx_call_func_custom_types('S', 'A');
|
||||
|
||||
-- Make sure we do bounds checking on distributed argument index
|
||||
-- This also tests that we have cache invalidation for pg_dist_object updates
|
||||
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, -1);
|
||||
select mx_call_func(2, 0);
|
||||
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 2);
|
||||
select mx_call_func(2, 0);
|
||||
|
||||
-- We don't currently support colocating with reference tables
|
||||
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_ref'::regclass, 1);
|
||||
select mx_call_func(2, 0);
|
||||
|
||||
-- We don't currently support colocating with replicated tables
|
||||
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_replica'::regclass, 1);
|
||||
select mx_call_func(2, 0);
|
||||
SET client_min_messages TO NOTICE;
|
||||
drop table mx_call_dist_table_replica;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
|
||||
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1);
|
||||
|
||||
-- Test table returning functions.
|
||||
CREATE FUNCTION mx_call_func_tbl(x int)
|
||||
RETURNS TABLE (p0 int, p1 int)
|
||||
LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
|
||||
UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
|
||||
UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
|
||||
RETURN QUERY
|
||||
SELECT id, val
|
||||
FROM multi_mx_function_call_delegation.mx_call_dist_table_1 t
|
||||
WHERE id >= x
|
||||
ORDER BY 1, 2;
|
||||
END;$$;
|
||||
|
||||
-- before distribution ...
|
||||
select mx_call_func_tbl(10);
|
||||
-- after distribution ...
|
||||
select create_distributed_function('mx_call_func_tbl(int)', '$1', 'mx_call_dist_table_1');
|
||||
select mx_call_func_tbl(20);
|
||||
|
||||
-- Test that we properly propagate errors raised from procedures.
|
||||
CREATE FUNCTION mx_call_func_raise(x int)
|
||||
RETURNS void LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
RAISE WARNING 'warning';
|
||||
RAISE EXCEPTION 'error';
|
||||
END;$$;
|
||||
select create_distributed_function('mx_call_func_raise(int)', '$1', 'mx_call_dist_table_1');
|
||||
select mx_call_func_raise(2);
|
||||
|
||||
-- Test that we don't propagate to non-metadata worker nodes
|
||||
select stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
select stop_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
select mx_call_func(2, 0);
|
||||
SET client_min_messages TO NOTICE;
|
||||
select start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
select start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
|
||||
-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make
|
||||
-- worker backend caches inconsistent. Reconnect to coordinator to use
|
||||
-- new worker connections, hence new backends.
|
||||
\c - - - :master_port
|
||||
SET search_path to multi_mx_function_call_delegation, public;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
|
||||
--
|
||||
-- Test non-const parameter values
|
||||
--
|
||||
CREATE FUNCTION mx_call_add(int, int) RETURNS int
|
||||
AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE;
|
||||
SELECT create_distributed_function('mx_call_add(int,int)', '$1');
|
||||
|
||||
-- subquery parameters cannot be pushed down
|
||||
select mx_call_func((select x + 1 from mx_call_add(3, 4) x), 2);
|
||||
|
||||
-- volatile parameter cannot be pushed down
|
||||
select mx_call_func(floor(random())::int, 2);
|
||||
|
||||
-- test forms we don't distribute
|
||||
select * from mx_call_func(2, 0);
|
||||
select mx_call_func(2, 0) from mx_call_dist_table_1;
|
||||
select mx_call_func(2, 0) where mx_call_func(0, 2) = 0;
|
||||
select mx_call_func(2, 0), mx_call_func(0, 2);
|
||||
|
||||
DO $$ BEGIN perform mx_call_func_tbl(40); END; $$;
|
||||
SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val;
|
||||
|
||||
-- Prepared statements
|
||||
PREPARE call_plan (int, int) AS SELECT mx_call_func($1, $2);
|
||||
EXECUTE call_plan(2, 0);
|
||||
|
||||
RESET client_min_messages;
|
||||
\set VERBOSITY terse
|
||||
DROP SCHEMA multi_mx_function_call_delegation CASCADE;
|
|
@ -140,3 +140,16 @@ BEGIN
|
|||
EXECUTE p_sql;
|
||||
PERFORM run_command_on_workers(p_sql);
|
||||
END;$$;
|
||||
|
||||
-- 1. Marks the given procedure as colocated with the given table.
|
||||
-- 2. Marks the argument index with which we route the procedure.
|
||||
CREATE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
|
||||
RETURNS void LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
update citus.pg_dist_object
|
||||
set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid
|
||||
from pg_proc, pg_dist_partition
|
||||
where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
|
||||
END;$$;
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue