Implement SELECT function call delegation.

When a function is marked as colocated with a distributed table,
we try delegating queries of kind "SELECT func(...)" to workers.

We currently only support this simple form, and don't delegate
forms like "SELECT f1(...), f2(...)", "SELECT f1(...) FROM ...",
or function calls inside transactions.

As a side effect, we also fix the transactional semantics of DO blocks.
Previously we didn't consider a DO block a multi-statement transaction.
Now we do.

Co-authored-by: Marco Slot <marco@citusdata.com>
Co-authored-by: serprex <serprex@users.noreply.github.com>
Co-authored-by: pykello <hadi.moshayedi@microsoft.com>
pull/3021/head
Marco Slot 2019-09-23 14:51:54 -07:00 committed by Hadi Moshayedi
parent dab16be283
commit 2868e02a3d
17 changed files with 1331 additions and 137 deletions

View File

@ -253,6 +253,31 @@ multi_ProcessUtility(PlannedStmt *pstmt,
}
#endif
if (IsA(parsetree, DoStmt))
{
/*
* All statements in a DO block are executed in a single transaciton,
* so we need to keep track of whether we are inside a DO block.
*/
DoBlockLevel += 1;
PG_TRY();
{
standard_ProcessUtility(pstmt, queryString, context,
params, queryEnv, dest, completionTag);
DoBlockLevel -= 1;
}
PG_CATCH();
{
DoBlockLevel -= 1;
PG_RE_THROW();
}
PG_END_TRY();
return;
}
/* process SET LOCAL stmts of whitelisted GUCs in multi-stmt xacts */
if (IsA(parsetree, VariableSetStmt))
{

View File

@ -87,6 +87,9 @@ bool EnableDeadlockPrevention = true;
/* number of nested stored procedure call levels we are currently in */
int StoredProcedureLevel = 0;
/* number of nested DO block levels we are currently in */
int DoBlockLevel = 0;
/* sort the returning to get consistent outputs */
bool SortReturning = false;

View File

@ -18,6 +18,7 @@
#include "catalog/pg_type.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/citus_nodes.h"
#include "distributed/function_call_delegation.h"
#include "distributed/insert_select_planner.h"
#include "distributed/intermediate_results.h"
#include "distributed/metadata_cache.h"
@ -192,6 +193,14 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
AdjustPartitioningForDistributedPlanning(rangeTableList,
setPartitionedTablesInherited);
}
else
{
DistributedPlan *delegatePlan = TryToDelegateFunctionCall(parse);
if (delegatePlan != NULL)
{
result = FinalizePlan(result, delegatePlan);
}
}
}
PG_CATCH();
{

View File

@ -0,0 +1,287 @@
/*-------------------------------------------------------------------------
*
* function_call_delegation.c
* Planning logic for delegating a function call to a worker when the
* function was distributed with a distribution argument and the worker
* has metadata.
*
* Copyright (c), Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/connection_management.h"
#include "distributed/function_call_delegation.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/remote_commands.h"
#include "distributed/shard_pruning.h"
#include "distributed/version_compat.h"
#include "distributed/worker_manager.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/parsenodes.h"
#include "nodes/primnodes.h"
#include "optimizer/clauses.h"
#include "parser/parse_coerce.h"
#if PG_VERSION_NUM >= 120000
#include "parser/parsetree.h"
#endif
#include "miscadmin.h"
#include "tcop/dest.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
static bool contain_param_walker(Node *node, void *context);
/*
* contain_param_walker scans node for Param nodes.
* returns whether any such nodes found.
*/
static bool
contain_param_walker(Node *node, void *context)
{
return IsA(node, Param);
}
/*
* TryToDelegateFunctionCall calls a function on the worker if possible.
* We only support delegating the SELECT func(...) form for distributed
* functions colocated by distributed tables, and not more complicated
* forms involving multiple function calls, FROM clauses, WHERE clauses,
* ... Those complex forms are handled in the coordinator.
*/
DistributedPlan *
TryToDelegateFunctionCall(Query *query)
{
FromExpr *joinTree = NULL;
List *targetList = NIL;
TargetEntry *targetEntry = NULL;
FuncExpr *funcExpr = NULL;
DistObjectCacheEntry *procedure = NULL;
Oid colocatedRelationId = InvalidOid;
Const *partitionValue = NULL;
Datum partitionValueDatum = 0;
ShardInterval *shardInterval = NULL;
List *placementList = NIL;
DistTableCacheEntry *distTable = NULL;
Var *partitionColumn = NULL;
ShardPlacement *placement = NULL;
WorkerNode *workerNode = NULL;
StringInfo queryString = NULL;
Task *task = NULL;
Job *job = NULL;
DistributedPlan *distributedPlan = NULL;
int32 groupId = 0;
if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG4))
{
/* Citus is not ready to determine whether function is distributed */
return NULL;
}
groupId = GetLocalGroupId();
if (groupId != 0 || groupId == GROUP_ID_UPGRADING)
{
/* do not delegate from workers, or while upgrading */
return NULL;
}
if (query == NULL)
{
/* no query (mostly here to be defensive) */
return NULL;
}
joinTree = query->jointree;
if (joinTree == NULL)
{
/* no join tree (mostly here to be defensive) */
return NULL;
}
if (joinTree->quals != NULL)
{
/* query has a WHERE section */
return NULL;
}
if (joinTree->fromlist != NIL)
{
/* query has a FROM section */
#if PG_VERSION_NUM >= 120000
/* in pg12 empty FROMs are represented with an RTE_RESULT */
if (list_length(joinTree->fromlist) == 1)
{
RangeTblRef *reference = linitial(joinTree->fromlist);
RangeTblEntry *rtentry = rt_fetch(reference->rtindex, query->rtable);
if (rtentry->rtekind != RTE_RESULT)
{
return NULL;
}
}
else
{
return NULL;
}
#else
return NULL;
#endif
}
targetList = query->targetList;
if (list_length(query->targetList) != 1)
{
/* multiple target list items */
return NULL;
}
targetEntry = (TargetEntry *) linitial(targetList);
if (!IsA(targetEntry->expr, FuncExpr))
{
/* target list item is not a function call */
return NULL;
}
funcExpr = (FuncExpr *) targetEntry->expr;
procedure = LookupDistObjectCacheEntry(ProcedureRelationId, funcExpr->funcid, 0);
if (procedure == NULL)
{
/* not a distributed function call */
ereport(DEBUG4, (errmsg("function is not distributed")));
return NULL;
}
if (IsMultiStatementTransaction())
{
/* cannot delegate function calls in a multi-statement transaction */
ereport(DEBUG1, (errmsg("not pushing down function calls in "
"a multi-statement transaction")));
return NULL;
}
if (procedure->distributionArgIndex < 0 ||
procedure->distributionArgIndex >= list_length(funcExpr->args))
{
ereport(DEBUG1, (errmsg("function call does not have a distribution argument")));
return NULL;
}
if (contain_volatile_functions((Node *) funcExpr->args))
{
ereport(DEBUG1, (errmsg("arguments in a distributed function must "
"be constant expressions")));
return NULL;
}
if (expression_tree_walker((Node *) funcExpr->args, contain_param_walker, NULL))
{
ereport(DEBUG1, (errmsg("arguments in a distributed function must "
"not contain subqueries")));
return NULL;
}
colocatedRelationId = ColocatedTableId(procedure->colocationId);
if (colocatedRelationId == InvalidOid)
{
ereport(DEBUG1, (errmsg("function does not have co-located tables")));
return NULL;
}
distTable = DistributedTableCacheEntry(colocatedRelationId);
partitionColumn = distTable->partitionColumn;
if (partitionColumn == NULL)
{
/* This can happen if colocated with a reference table. Punt for now. */
ereport(DEBUG1, (errmsg(
"cannnot push down function call for reference tables")));
return NULL;
}
partitionValue = (Const *) list_nth(funcExpr->args, procedure->distributionArgIndex);
if (!IsA(partitionValue, Const))
{
ereport(DEBUG1, (errmsg("distribution argument value must be a constant")));
return NULL;
}
partitionValueDatum = partitionValue->constvalue;
if (partitionValue->consttype != partitionColumn->vartype)
{
CopyCoercionData coercionData;
ConversionPathForTypes(partitionValue->consttype, partitionColumn->vartype,
&coercionData);
partitionValueDatum = CoerceColumnValue(partitionValueDatum, &coercionData);
}
shardInterval = FindShardInterval(partitionValueDatum, distTable);
if (shardInterval == NULL)
{
ereport(DEBUG1, (errmsg("cannot push down call, failed to find shard interval")));
return NULL;
}
placementList = FinalizedShardPlacementList(shardInterval->shardId);
if (list_length(placementList) != 1)
{
/* punt on this for now */
ereport(DEBUG1, (errmsg(
"cannot push down function call for replicated distributed tables")));
return NULL;
}
placement = (ShardPlacement *) linitial(placementList);
workerNode = FindWorkerNode(placement->nodeName, placement->nodePort);
if (workerNode == NULL || !workerNode->hasMetadata || !workerNode->metadataSynced)
{
ereport(DEBUG1, (errmsg("the worker node does not have metadata")));
return NULL;
}
ereport(DEBUG1, (errmsg("pushing down the function call")));
queryString = makeStringInfo();
pg_get_query_def(query, queryString);
task = CitusMakeNode(Task);
task->taskType = SQL_TASK;
task->queryString = queryString->data;
task->taskPlacementList = placementList;
task->anchorShardId = shardInterval->shardId;
task->replicationModel = distTable->replicationModel;
job = CitusMakeNode(Job);
job->jobId = UniqueJobId();
job->jobQuery = query;
job->taskList = list_make1(task);
distributedPlan = CitusMakeNode(DistributedPlan);
distributedPlan->workerJob = job;
distributedPlan->masterQuery = NULL;
distributedPlan->routerExecutable = true;
distributedPlan->hasReturning = false;
/* worker will take care of any necessary locking, treat query as read-only */
distributedPlan->modLevel = ROW_MODIFY_READONLY;
return distributedPlan;
}

View File

@ -638,6 +638,11 @@ IsMultiStatementTransaction(void)
/* in a BEGIN...END block */
return true;
}
else if (DoBlockLevel > 0)
{
/* in (a transaction within) a do block */
return true;
}
else if (StoredProcedureLevel > 0)
{
/* in (a transaction within) a stored procedure */

View File

@ -3143,18 +3143,22 @@ GetLocalGroupId(void)
tupleDescriptor, &isNull);
groupId = DatumGetInt32(groupIdDatum);
/* set the local cache variable */
LocalGroupId = groupId;
}
else
{
elog(ERROR, "could not find any entries in pg_dist_local_group");
/*
* Upgrade is happening. When upgrading postgres, pg_dist_local_group is
* temporarily empty before citus_finish_pg_upgrade() finishes execution.
*/
groupId = GROUP_ID_UPGRADING;
}
systable_endscan(scanDescriptor);
heap_close(pgDistLocalGroupId, AccessShareLock);
/* set the local cache variable */
LocalGroupId = groupId;
return groupId;
}

View File

@ -0,0 +1,19 @@
/*
* function_call_delegation.h
* Declarations for public functions and variables used to delegate
* function calls to worker nodes.
*
* Copyright (c), Citus Data, Inc.
*/
#ifndef FUNCTION_CALL_DELEGATION_H
#define FUNCTION_CALL_DELEGATION_H
#include "postgres.h"
#include "distributed/multi_physical_planner.h"
DistributedPlan * TryToDelegateFunctionCall(Query *query);
#endif /* FUNCTION_CALL_DELEGATION_H */

View File

@ -29,6 +29,14 @@ typedef enum
} ReadFromSecondariesType;
extern int ReadFromSecondaries;
/*
* While upgrading pg_dist_local_group can be empty temporarily, in that
* case we use GROUP_ID_UPGRADING as the local group id to communicate
* this to other functions.
*/
#define GROUP_ID_UPGRADING -2
/*
* Representation of a table's metadata that is frequently used for
* distributed execution. Cached.

View File

@ -91,6 +91,9 @@ extern dlist_head InProgressTransactions;
/* number of nested stored procedure call levels we are currently in */
extern int StoredProcedureLevel;
/* number of nested DO block levels we are currently in */
extern int DoBlockLevel;
/* number of nested function call levels we are currently in */
extern int FunctionCallLevel;

View File

@ -1,4 +1,6 @@
-- Test passing off CALL to mx workers
create schema multi_mx_call;
set search_path to multi_mx_call, public;
-- Create worker-local tables to test procedure calls were routed
set citus.shard_replication_factor to 2;
set citus.replication_model to 'statement';
@ -13,21 +15,6 @@ select create_distributed_table('mx_call_dist_table_replica', 'id');
insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5);
set citus.shard_replication_factor to 1;
set citus.replication_model to 'streaming';
create schema multi_mx_call;
set search_path to multi_mx_call, public;
--
-- Utility UDFs
--
-- 1. Marks the given procedure as colocated with the given table.
-- 2. Marks the argument index with which we route the procedure.
CREATE PROCEDURE colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
LANGUAGE plpgsql AS $$
BEGIN
update citus.pg_dist_object
set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid
from pg_proc, pg_dist_partition
where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
END;$$;
--
-- Create tables and procedures we want to use in tests
--
@ -96,6 +83,19 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A');
F | S
(1 row)
-- Same for unqualified names
call mx_call_proc(2, 0);
y
----
29
(1 row)
call mx_call_proc_custom_types('S', 'A');
x | y
---+---
F | S
(1 row)
-- Mark both procedures as distributed ...
select create_distributed_function('mx_call_proc(int,int)');
create_distributed_function
@ -114,10 +114,10 @@ select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_ca
SET client_min_messages TO DEBUG1;
call multi_mx_call.mx_call_proc(2, 0);
DEBUG: stored procedure does not have co-located tables
DEBUG: generating subplan 8_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
DEBUG: generating subplan 10_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
DEBUG: Plan 8 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
y
@ -133,8 +133,18 @@ DEBUG: stored procedure does not have co-located tables
(1 row)
-- Mark them as colocated with a table. Now we should route them to workers.
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
call multi_mx_call.colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
colocate_proc_with_table
--------------------------
(1 row)
select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
colocate_proc_with_table
--------------------------
(1 row)
call multi_mx_call.mx_call_proc(2, 0);
DEBUG: pushing down the procedure
y
@ -149,14 +159,28 @@ DEBUG: pushing down the procedure
S | S
(1 row)
call mx_call_proc(2, 0);
DEBUG: pushing down the procedure
y
----
28
(1 row)
call mx_call_proc_custom_types('S', 'A');
DEBUG: pushing down the procedure
x | y
---+---
S | S
(1 row)
-- We don't allow distributing calls inside transactions
begin;
call multi_mx_call.mx_call_proc(2, 0);
DEBUG: cannot push down CALL in multi-statement transaction
DEBUG: generating subplan 10_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
DEBUG: generating subplan 12_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
DEBUG: Plan 12 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
y
@ -179,21 +203,12 @@ DEBUG: stored procedure does not have co-located tables
-- Make sure we do bounds checking on distributed argument index
-- This also tests that we have cache invalidation for pg_dist_object updates
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1);
call multi_mx_call.mx_call_proc(2, 0);
DEBUG: cannot push down invalid distribution_argument_index
DEBUG: generating subplan 12_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
DEBUG: Plan 12 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
y
----
29
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1);
colocate_proc_with_table
--------------------------
(1 row)
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2);
call multi_mx_call.mx_call_proc(2, 0);
DEBUG: cannot push down invalid distribution_argument_index
DEBUG: generating subplan 14_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
@ -207,10 +222,14 @@ PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
29
(1 row)
-- We don't currently support colocating with reference tables
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1);
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2);
colocate_proc_with_table
--------------------------
(1 row)
call multi_mx_call.mx_call_proc(2, 0);
DEBUG: cannot push down CALL for reference tables
DEBUG: cannot push down invalid distribution_argument_index
DEBUG: generating subplan 17_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
@ -222,10 +241,15 @@ PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
29
(1 row)
-- We don't currently support colocating with replicated tables
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1);
-- We don't currently support colocating with reference tables
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1);
colocate_proc_with_table
--------------------------
(1 row)
call multi_mx_call.mx_call_proc(2, 0);
DEBUG: cannot push down CALL for replicated distributed tables
DEBUG: cannot push down CALL for reference tables
DEBUG: generating subplan 19_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
@ -237,20 +261,45 @@ PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
29
(1 row)
-- We don't currently support colocating with replicated tables
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1);
colocate_proc_with_table
--------------------------
(1 row)
call multi_mx_call.mx_call_proc(2, 0);
DEBUG: cannot push down CALL for replicated distributed tables
DEBUG: generating subplan 21_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
DEBUG: Plan 21 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('21_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
y
----
29
(1 row)
SET client_min_messages TO NOTICE;
drop table mx_call_dist_table_replica;
SET client_min_messages TO DEBUG1;
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
colocate_proc_with_table
--------------------------
(1 row)
-- Test that we handle transactional constructs correctly inside a procedure
-- that is routed to the workers.
CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
INSERT INTO multi_mx_call.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
COMMIT;
UPDATE mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
ROLLBACK;
-- Now do the final update!
UPDATE mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
END;$$;
-- before distribution ...
CALL multi_mx_call.mx_call_proc_tx(10);
@ -265,9 +314,6 @@ DETAIL: A distributed function is created. To make sure subsequent commands see
CALL multi_mx_call.mx_call_proc_tx(20);
DEBUG: pushing down the procedure
ERROR: relation "mx_call_dist_table_1" does not exist
CONTEXT: while executing command on localhost:57637
PL/pgSQL function multi_mx_call.mx_call_proc_tx(integer) line 3 at SQL statement
SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val;
id | val
----+-----
@ -278,7 +324,9 @@ SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val;
9 | 2
10 | -2
11 | 3
(7 rows)
20 | -2
21 | 3
(9 rows)
-- Test that we properly propagate errors raised from procedures.
CREATE PROCEDURE mx_call_proc_raise(x int) LANGUAGE plpgsql AS $$
@ -316,10 +364,10 @@ select stop_metadata_sync_to_node('localhost', :worker_2_port);
call multi_mx_call.mx_call_proc(2, 0);
DEBUG: there is no worker node with metadata
DEBUG: generating subplan 25_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
DEBUG: generating subplan 27_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
DEBUG: Plan 25 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('25_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
DEBUG: Plan 27 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('27_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
y
@ -340,6 +388,11 @@ select start_metadata_sync_to_node('localhost', :worker_2_port);
(1 row)
-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make
-- worker backend caches inconsistent. Reconnect to coordinator to use
-- new worker connections, hence new backends.
\c - - - :master_port
SET search_path to multi_mx_call, public;
SET client_min_messages TO DEBUG1;
--
-- Test non-const parameter values
@ -357,10 +410,10 @@ DETAIL: A distributed function is created. To make sure subsequent commands see
-- non-const distribution parameters cannot be pushed down
call multi_mx_call.mx_call_proc(2, mx_call_add(3, 4));
DEBUG: distribution argument value must be a constant
DEBUG: generating subplan 27_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
DEBUG: generating subplan 1_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
DEBUG: Plan 27 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('27_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
DEBUG: Plan 1 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('1_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
y
@ -379,10 +432,10 @@ DEBUG: pushing down the procedure
-- volatile parameter cannot be pushed down
call multi_mx_call.mx_call_proc(floor(random())::int, 2);
DEBUG: arguments in a distributed stored procedure must be constant expressions
DEBUG: generating subplan 29_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
DEBUG: generating subplan 3_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
DEBUG: Plan 29 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('29_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
DEBUG: Plan 3 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('3_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
y
@ -393,4 +446,4 @@ PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment
reset client_min_messages;
\set VERBOSITY terse
drop schema multi_mx_call cascade;
NOTICE: drop cascades to 10 other objects
NOTICE: drop cascades to 9 other objects

View File

@ -1,4 +1,6 @@
-- Test passing off CALL to mx workers
create schema multi_mx_call;
set search_path to multi_mx_call, public;
-- Create worker-local tables to test procedure calls were routed
set citus.shard_replication_factor to 2;
set citus.replication_model to 'statement';
@ -13,24 +15,6 @@ select create_distributed_table('mx_call_dist_table_replica', 'id');
insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5);
set citus.shard_replication_factor to 1;
set citus.replication_model to 'streaming';
create schema multi_mx_call;
set search_path to multi_mx_call, public;
--
-- Utility UDFs
--
-- 1. Marks the given procedure as colocated with the given table.
-- 2. Marks the argument index with which we route the procedure.
CREATE PROCEDURE colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
LANGUAGE plpgsql AS $$
BEGIN
update citus.pg_dist_object
set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid
from pg_proc, pg_dist_partition
where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
END;$$;
ERROR: syntax error at or near "PROCEDURE"
LINE 1: CREATE PROCEDURE colocate_proc_with_table(procname text, tab...
^
--
-- Create tables and procedures we want to use in tests
--
@ -101,6 +85,15 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A');
ERROR: syntax error at or near "call"
LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A');
^
-- Same for unqualified names
call mx_call_proc(2, 0);
ERROR: syntax error at or near "call"
LINE 1: call mx_call_proc(2, 0);
^
call mx_call_proc_custom_types('S', 'A');
ERROR: syntax error at or near "call"
LINE 1: call mx_call_proc_custom_types('S', 'A');
^
-- Mark both procedures as distributed ...
select create_distributed_function('mx_call_proc(int,int)');
ERROR: function "mx_call_proc(int,int)" does not exist
@ -122,14 +115,18 @@ ERROR: syntax error at or near "call"
LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A');
^
-- Mark them as colocated with a table. Now we should route them to workers.
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
ERROR: syntax error at or near "call"
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ...
^
call multi_mx_call.colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
ERROR: syntax error at or near "call"
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc_cu...
^
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
colocate_proc_with_table
--------------------------
(1 row)
select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
colocate_proc_with_table
--------------------------
(1 row)
call multi_mx_call.mx_call_proc(2, 0);
ERROR: syntax error at or near "call"
LINE 1: call multi_mx_call.mx_call_proc(2, 0);
@ -138,6 +135,14 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A');
ERROR: syntax error at or near "call"
LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A');
^
call mx_call_proc(2, 0);
ERROR: syntax error at or near "call"
LINE 1: call mx_call_proc(2, 0);
^
call mx_call_proc_custom_types('S', 'A');
ERROR: syntax error at or near "call"
LINE 1: call mx_call_proc_custom_types('S', 'A');
^
-- We don't allow distributing calls inside transactions
begin;
call multi_mx_call.mx_call_proc(2, 0);
@ -156,36 +161,44 @@ LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A');
^
-- Make sure we do bounds checking on distributed argument index
-- This also tests that we have cache invalidation for pg_dist_object updates
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1);
ERROR: syntax error at or near "call"
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ...
^
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1);
colocate_proc_with_table
--------------------------
(1 row)
call multi_mx_call.mx_call_proc(2, 0);
ERROR: syntax error at or near "call"
LINE 1: call multi_mx_call.mx_call_proc(2, 0);
^
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2);
ERROR: syntax error at or near "call"
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ...
^
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2);
colocate_proc_with_table
--------------------------
(1 row)
call multi_mx_call.mx_call_proc(2, 0);
ERROR: syntax error at or near "call"
LINE 1: call multi_mx_call.mx_call_proc(2, 0);
^
-- We don't currently support colocating with reference tables
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1);
ERROR: syntax error at or near "call"
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ...
^
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1);
colocate_proc_with_table
--------------------------
(1 row)
call multi_mx_call.mx_call_proc(2, 0);
ERROR: syntax error at or near "call"
LINE 1: call multi_mx_call.mx_call_proc(2, 0);
^
-- We don't currently support colocating with replicated tables
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1);
ERROR: syntax error at or near "call"
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ...
^
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1);
colocate_proc_with_table
--------------------------
(1 row)
call multi_mx_call.mx_call_proc(2, 0);
ERROR: syntax error at or near "call"
LINE 1: call multi_mx_call.mx_call_proc(2, 0);
@ -193,20 +206,22 @@ LINE 1: call multi_mx_call.mx_call_proc(2, 0);
SET client_min_messages TO NOTICE;
drop table mx_call_dist_table_replica;
SET client_min_messages TO DEBUG1;
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
ERROR: syntax error at or near "call"
LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ...
^
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
colocate_proc_with_table
--------------------------
(1 row)
-- Test that we handle transactional constructs correctly inside a procedure
-- that is routed to the workers.
CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
INSERT INTO multi_mx_call.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
COMMIT;
UPDATE mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
ROLLBACK;
-- Now do the final update!
UPDATE mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
END;$$;
ERROR: syntax error at or near "PROCEDURE"
LINE 1: CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS ...
@ -282,6 +297,11 @@ select start_metadata_sync_to_node('localhost', :worker_2_port);
(1 row)
-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make
-- worker backend caches inconsistent. Reconnect to coordinator to use
-- new worker connections, hence new backends.
\c - - - :master_port
SET search_path to multi_mx_call, public;
SET client_min_messages TO DEBUG1;
--
-- Test non-const parameter values

View File

@ -0,0 +1,539 @@
-- Test passing off function call to mx workers
CREATE SCHEMA multi_mx_function_call_delegation;
SET search_path TO multi_mx_function_call_delegation, public;
SET citus.shard_replication_factor TO 2;
SET citus.replication_model TO 'statement';
-- This table requires specific settings, create before getting into things
create table mx_call_dist_table_replica(id int, val int);
select create_distributed_table('mx_call_dist_table_replica', 'id');
create_distributed_table
--------------------------
(1 row)
insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5);
SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO 'streaming';
--
-- Create tables and functions we want to use in tests
--
create table mx_call_dist_table_1(id int, val int);
select create_distributed_table('mx_call_dist_table_1', 'id');
create_distributed_table
--------------------------
(1 row)
insert into mx_call_dist_table_1 values (3,1),(4,5),(9,2),(6,5),(3,5);
create table mx_call_dist_table_2(id int, val int);
select create_distributed_table('mx_call_dist_table_2', 'id');
create_distributed_table
--------------------------
(1 row)
insert into mx_call_dist_table_2 values (1,1),(1,2),(2,2),(3,3),(3,4);
create table mx_call_dist_table_ref(id int, val int);
select create_reference_table('mx_call_dist_table_ref');
create_reference_table
------------------------
(1 row)
insert into mx_call_dist_table_ref values (2,7),(1,8),(2,8),(1,8),(2,8);
create type mx_call_enum as enum ('A', 'S', 'D', 'F');
create table mx_call_dist_table_enum(id int, key mx_call_enum);
select create_distributed_table('mx_call_dist_table_enum', 'key');
create_distributed_table
--------------------------
(1 row)
insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F');
CREATE FUNCTION squares(int) RETURNS SETOF RECORD
AS $$ SELECT i, i * i FROM generate_series(1, $1) i $$
LANGUAGE SQL;
CREATE FUNCTION mx_call_func(x int, INOUT y int)
LANGUAGE plpgsql AS $$
BEGIN
-- groupid is 0 in coordinator and non-zero in workers, so by using it here
-- we make sure the function is being executed in the worker.
y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group);
-- we also make sure that we can run distributed queries in the functions
-- that are routed to the workers.
y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id);
END;$$;
-- create another function which verifies:
-- 1. we work fine with multiple return columns
-- 2. we work fine in combination with custom types
CREATE FUNCTION mx_call_func_custom_types(INOUT x mx_call_enum, INOUT y mx_call_enum)
LANGUAGE plpgsql AS $$
BEGIN
y := x;
x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
END;$$;
-- Test that undistributed functions have no issue executing
select multi_mx_function_call_delegation.mx_call_func(2, 0);
mx_call_func
--------------
29
(1 row)
select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
mx_call_func_custom_types
---------------------------
(F,S)
(1 row)
select squares(4);
squares
---------
(1,1)
(2,4)
(3,9)
(4,16)
(4 rows)
-- Same for unqualified name
select mx_call_func(2, 0);
mx_call_func
--------------
29
(1 row)
-- Mark both functions as distributed ...
select create_distributed_function('mx_call_func(int,int)');
create_distributed_function
-----------------------------
(1 row)
select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_call_enum)');
create_distributed_function
-----------------------------
(1 row)
select create_distributed_function('squares(int)');
create_distributed_function
-----------------------------
(1 row)
-- We still don't route them to the workers, because they aren't
-- colocated with any distributed tables.
SET client_min_messages TO DEBUG1;
select mx_call_func(2, 0);
DEBUG: function does not have co-located tables
DEBUG: generating subplan 10_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func
--------------
29
(1 row)
select mx_call_func_custom_types('S', 'A');
DEBUG: function does not have co-located tables
mx_call_func_custom_types
---------------------------
(F,S)
(1 row)
-- Mark them as colocated with a table. Now we should route them to workers.
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1);
colocate_proc_with_table
--------------------------
(1 row)
select colocate_proc_with_table('mx_call_func_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
colocate_proc_with_table
--------------------------
(1 row)
select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0);
colocate_proc_with_table
--------------------------
(1 row)
select mx_call_func(2, 0);
DEBUG: pushing down the function call
mx_call_func
--------------
28
(1 row)
select mx_call_func_custom_types('S', 'A');
DEBUG: pushing down the function call
mx_call_func_custom_types
---------------------------
(S,S)
(1 row)
select squares(4);
DEBUG: pushing down the function call
ERROR: input of anonymous composite types is not implemented
select multi_mx_function_call_delegation.mx_call_func(2, 0);
DEBUG: pushing down the function call
mx_call_func
--------------
28
(1 row)
select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
DEBUG: pushing down the function call
mx_call_func_custom_types
---------------------------
(S,S)
(1 row)
-- We don't allow distributing calls inside transactions
begin;
select mx_call_func(2, 0);
DEBUG: not pushing down function calls in a multi-statement transaction
DEBUG: generating subplan 12_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
DEBUG: Plan 12 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func
--------------
29
(1 row)
commit;
-- Drop the table colocated with mx_call_func_custom_types. Now it shouldn't
-- be routed to workers anymore.
SET client_min_messages TO NOTICE;
drop table mx_call_dist_table_enum;
SET client_min_messages TO DEBUG1;
select mx_call_func_custom_types('S', 'A');
DEBUG: function does not have co-located tables
mx_call_func_custom_types
---------------------------
(F,S)
(1 row)
-- Make sure we do bounds checking on distributed argument index
-- This also tests that we have cache invalidation for pg_dist_object updates
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, -1);
colocate_proc_with_table
--------------------------
(1 row)
select mx_call_func(2, 0);
DEBUG: function call does not have a distribution argument
DEBUG: generating subplan 14_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
DEBUG: Plan 14 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('14_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func
--------------
29
(1 row)
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 2);
colocate_proc_with_table
--------------------------
(1 row)
select mx_call_func(2, 0);
DEBUG: function call does not have a distribution argument
DEBUG: generating subplan 17_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
DEBUG: Plan 17 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('17_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func
--------------
29
(1 row)
-- We don't currently support colocating with reference tables
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_ref'::regclass, 1);
colocate_proc_with_table
--------------------------
(1 row)
select mx_call_func(2, 0);
DEBUG: cannnot push down function call for reference tables
DEBUG: generating subplan 19_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
DEBUG: Plan 19 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('19_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func
--------------
29
(1 row)
-- We don't currently support colocating with replicated tables
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_replica'::regclass, 1);
colocate_proc_with_table
--------------------------
(1 row)
select mx_call_func(2, 0);
DEBUG: cannot push down function call for replicated distributed tables
DEBUG: generating subplan 21_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
DEBUG: Plan 21 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('21_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func
--------------
29
(1 row)
SET client_min_messages TO NOTICE;
drop table mx_call_dist_table_replica;
SET client_min_messages TO DEBUG1;
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1);
colocate_proc_with_table
--------------------------
(1 row)
-- Test table returning functions.
CREATE FUNCTION mx_call_func_tbl(x int)
RETURNS TABLE (p0 int, p1 int)
LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
RETURN QUERY
SELECT id, val
FROM multi_mx_function_call_delegation.mx_call_dist_table_1 t
WHERE id >= x
ORDER BY 1, 2;
END;$$;
-- before distribution ...
select mx_call_func_tbl(10);
mx_call_func_tbl
------------------
(10,-1)
(11,4)
(2 rows)
-- after distribution ...
select create_distributed_function('mx_call_func_tbl(int)', '$1', 'mx_call_dist_table_1');
DEBUG: switching to sequential query execution mode
DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
create_distributed_function
-----------------------------
(1 row)
select mx_call_func_tbl(20);
DEBUG: pushing down the function call
mx_call_func_tbl
------------------
(20,-1)
(21,4)
(2 rows)
-- Test that we properly propagate errors raised from procedures.
CREATE FUNCTION mx_call_func_raise(x int)
RETURNS void LANGUAGE plpgsql AS $$
BEGIN
RAISE WARNING 'warning';
RAISE EXCEPTION 'error';
END;$$;
select create_distributed_function('mx_call_func_raise(int)', '$1', 'mx_call_dist_table_1');
DEBUG: switching to sequential query execution mode
DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
create_distributed_function
-----------------------------
(1 row)
select mx_call_func_raise(2);
DEBUG: pushing down the function call
DEBUG: warning
DETAIL: WARNING from localhost:57638
ERROR: error
CONTEXT: while executing command on localhost:57638
PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_raise(integer) line 4 at RAISE
-- Test that we don't propagate to non-metadata worker nodes
select stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
----------------------------
(1 row)
select stop_metadata_sync_to_node('localhost', :worker_2_port);
stop_metadata_sync_to_node
----------------------------
(1 row)
select mx_call_func(2, 0);
DEBUG: the worker node does not have metadata
DEBUG: generating subplan 27_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
DEBUG: Plan 27 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('27_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func
--------------
29
(1 row)
SET client_min_messages TO NOTICE;
select start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
select start_metadata_sync_to_node('localhost', :worker_2_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make
-- worker backend caches inconsistent. Reconnect to coordinator to use
-- new worker connections, hence new backends.
\c - - - :master_port
SET search_path to multi_mx_function_call_delegation, public;
SET client_min_messages TO DEBUG1;
--
-- Test non-const parameter values
--
CREATE FUNCTION mx_call_add(int, int) RETURNS int
AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE;
SELECT create_distributed_function('mx_call_add(int,int)', '$1');
DEBUG: switching to sequential query execution mode
DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
create_distributed_function
-----------------------------
(1 row)
-- subquery parameters cannot be pushed down
select mx_call_func((select x + 1 from mx_call_add(3, 4) x), 2);
DEBUG: arguments in a distributed function must not contain subqueries
DEBUG: generating subplan 1_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
DEBUG: Plan 1 query after replacing subqueries and CTEs: SELECT (9 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('1_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func
--------------
35
(1 row)
-- volatile parameter cannot be pushed down
select mx_call_func(floor(random())::int, 2);
DEBUG: arguments in a distributed function must be constant expressions
DEBUG: generating subplan 3_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
DEBUG: Plan 3 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('3_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func
--------------
27
(1 row)
-- test forms we don't distribute
select * from mx_call_func(2, 0);
DEBUG: generating subplan 5_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
DEBUG: Plan 5 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('5_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
y
----
29
(1 row)
select mx_call_func(2, 0) from mx_call_dist_table_1;
mx_call_func
--------------
28
28
28
28
28
28
28
28
28
(9 rows)
select mx_call_func(2, 0) where mx_call_func(0, 2) = 0;
DEBUG: generating subplan 8_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
DEBUG: Plan 8 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func
--------------
(0 rows)
select mx_call_func(2, 0), mx_call_func(0, 2);
DEBUG: generating subplan 10_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
DEBUG: generating subplan 13_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
DEBUG: Plan 13 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('13_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment
mx_call_func | mx_call_func
--------------+--------------
29 | 27
(1 row)
DO $$ BEGIN perform mx_call_func_tbl(40); END; $$;
DEBUG: not pushing down function calls in a multi-statement transaction
CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)"
PL/pgSQL function inline_code_block line 1 at PERFORM
SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val;
id | val
----+-----
40 | -1
41 | 4
(2 rows)
-- Prepared statements
PREPARE call_plan (int, int) AS SELECT mx_call_func($1, $2);
EXECUTE call_plan(2, 0);
DEBUG: pushing down the function call
mx_call_func
--------------
28
(1 row)
RESET client_min_messages;
\set VERBOSITY terse
DROP SCHEMA multi_mx_function_call_delegation CASCADE;
NOTICE: drop cascades to 10 other objects

View File

@ -140,3 +140,13 @@ BEGIN
EXECUTE p_sql;
PERFORM run_command_on_workers(p_sql);
END;$$;
-- 1. Marks the given procedure as colocated with the given table.
-- 2. Marks the argument index with which we route the procedure.
CREATE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
RETURNS void LANGUAGE plpgsql AS $$
BEGIN
update citus.pg_dist_object
set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid
from pg_proc, pg_dist_partition
where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
END;$$;

View File

@ -33,6 +33,7 @@ test: multi_mx_repartition_udt_prepare mx_foreign_key_to_reference_table
test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
test: multi_mx_metadata
test: multi_mx_call
test: multi_mx_function_call_delegation
test: multi_mx_modifications local_shard_execution
test: multi_mx_transaction_recovery
test: multi_mx_modifying_xacts

View File

@ -1,5 +1,8 @@
-- Test passing off CALL to mx workers
create schema multi_mx_call;
set search_path to multi_mx_call, public;
-- Create worker-local tables to test procedure calls were routed
set citus.shard_replication_factor to 2;
@ -13,25 +16,6 @@ insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5);
set citus.shard_replication_factor to 1;
set citus.replication_model to 'streaming';
create schema multi_mx_call;
set search_path to multi_mx_call, public;
--
-- Utility UDFs
--
-- 1. Marks the given procedure as colocated with the given table.
-- 2. Marks the argument index with which we route the procedure.
CREATE PROCEDURE colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
LANGUAGE plpgsql AS $$
BEGIN
update citus.pg_dist_object
set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid
from pg_proc, pg_dist_partition
where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
END;$$;
--
-- Create tables and procedures we want to use in tests
--
@ -78,6 +62,10 @@ END;$$;
call multi_mx_call.mx_call_proc(2, 0);
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
-- Same for unqualified names
call mx_call_proc(2, 0);
call mx_call_proc_custom_types('S', 'A');
-- Mark both procedures as distributed ...
select create_distributed_function('mx_call_proc(int,int)');
select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_call_enum)');
@ -89,10 +77,13 @@ call multi_mx_call.mx_call_proc(2, 0);
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
-- Mark them as colocated with a table. Now we should route them to workers.
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
call multi_mx_call.colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
call multi_mx_call.mx_call_proc(2, 0);
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
call mx_call_proc(2, 0);
call mx_call_proc_custom_types('S', 'A');
-- We don't allow distributing calls inside transactions
begin;
@ -108,34 +99,34 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A');
-- Make sure we do bounds checking on distributed argument index
-- This also tests that we have cache invalidation for pg_dist_object updates
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1);
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1);
call multi_mx_call.mx_call_proc(2, 0);
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2);
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2);
call multi_mx_call.mx_call_proc(2, 0);
-- We don't currently support colocating with reference tables
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1);
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1);
call multi_mx_call.mx_call_proc(2, 0);
-- We don't currently support colocating with replicated tables
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1);
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1);
call multi_mx_call.mx_call_proc(2, 0);
SET client_min_messages TO NOTICE;
drop table mx_call_dist_table_replica;
SET client_min_messages TO DEBUG1;
call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
-- Test that we handle transactional constructs correctly inside a procedure
-- that is routed to the workers.
CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
INSERT INTO multi_mx_call.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
COMMIT;
UPDATE mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
ROLLBACK;
-- Now do the final update!
UPDATE mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
END;$$;
-- before distribution ...
@ -162,6 +153,12 @@ call multi_mx_call.mx_call_proc(2, 0);
SET client_min_messages TO NOTICE;
select start_metadata_sync_to_node('localhost', :worker_1_port);
select start_metadata_sync_to_node('localhost', :worker_2_port);
-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make
-- worker backend caches inconsistent. Reconnect to coordinator to use
-- new worker connections, hence new backends.
\c - - - :master_port
SET search_path to multi_mx_call, public;
SET client_min_messages TO DEBUG1;
--

View File

@ -0,0 +1,198 @@
-- Test passing off function call to mx workers
CREATE SCHEMA multi_mx_function_call_delegation;
SET search_path TO multi_mx_function_call_delegation, public;
SET citus.shard_replication_factor TO 2;
SET citus.replication_model TO 'statement';
-- This table requires specific settings, create before getting into things
create table mx_call_dist_table_replica(id int, val int);
select create_distributed_table('mx_call_dist_table_replica', 'id');
insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5);
SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO 'streaming';
--
-- Create tables and functions we want to use in tests
--
create table mx_call_dist_table_1(id int, val int);
select create_distributed_table('mx_call_dist_table_1', 'id');
insert into mx_call_dist_table_1 values (3,1),(4,5),(9,2),(6,5),(3,5);
create table mx_call_dist_table_2(id int, val int);
select create_distributed_table('mx_call_dist_table_2', 'id');
insert into mx_call_dist_table_2 values (1,1),(1,2),(2,2),(3,3),(3,4);
create table mx_call_dist_table_ref(id int, val int);
select create_reference_table('mx_call_dist_table_ref');
insert into mx_call_dist_table_ref values (2,7),(1,8),(2,8),(1,8),(2,8);
create type mx_call_enum as enum ('A', 'S', 'D', 'F');
create table mx_call_dist_table_enum(id int, key mx_call_enum);
select create_distributed_table('mx_call_dist_table_enum', 'key');
insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F');
CREATE FUNCTION squares(int) RETURNS SETOF RECORD
AS $$ SELECT i, i * i FROM generate_series(1, $1) i $$
LANGUAGE SQL;
CREATE FUNCTION mx_call_func(x int, INOUT y int)
LANGUAGE plpgsql AS $$
BEGIN
-- groupid is 0 in coordinator and non-zero in workers, so by using it here
-- we make sure the function is being executed in the worker.
y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group);
-- we also make sure that we can run distributed queries in the functions
-- that are routed to the workers.
y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id);
END;$$;
-- create another function which verifies:
-- 1. we work fine with multiple return columns
-- 2. we work fine in combination with custom types
CREATE FUNCTION mx_call_func_custom_types(INOUT x mx_call_enum, INOUT y mx_call_enum)
LANGUAGE plpgsql AS $$
BEGIN
y := x;
x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
END;$$;
-- Test that undistributed functions have no issue executing
select multi_mx_function_call_delegation.mx_call_func(2, 0);
select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
select squares(4);
-- Same for unqualified name
select mx_call_func(2, 0);
-- Mark both functions as distributed ...
select create_distributed_function('mx_call_func(int,int)');
select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_call_enum)');
select create_distributed_function('squares(int)');
-- We still don't route them to the workers, because they aren't
-- colocated with any distributed tables.
SET client_min_messages TO DEBUG1;
select mx_call_func(2, 0);
select mx_call_func_custom_types('S', 'A');
-- Mark them as colocated with a table. Now we should route them to workers.
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1);
select colocate_proc_with_table('mx_call_func_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0);
select mx_call_func(2, 0);
select mx_call_func_custom_types('S', 'A');
select squares(4);
select multi_mx_function_call_delegation.mx_call_func(2, 0);
select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
-- We don't allow distributing calls inside transactions
begin;
select mx_call_func(2, 0);
commit;
-- Drop the table colocated with mx_call_func_custom_types. Now it shouldn't
-- be routed to workers anymore.
SET client_min_messages TO NOTICE;
drop table mx_call_dist_table_enum;
SET client_min_messages TO DEBUG1;
select mx_call_func_custom_types('S', 'A');
-- Make sure we do bounds checking on distributed argument index
-- This also tests that we have cache invalidation for pg_dist_object updates
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, -1);
select mx_call_func(2, 0);
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 2);
select mx_call_func(2, 0);
-- We don't currently support colocating with reference tables
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_ref'::regclass, 1);
select mx_call_func(2, 0);
-- We don't currently support colocating with replicated tables
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_replica'::regclass, 1);
select mx_call_func(2, 0);
SET client_min_messages TO NOTICE;
drop table mx_call_dist_table_replica;
SET client_min_messages TO DEBUG1;
select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1);
-- Test table returning functions.
CREATE FUNCTION mx_call_func_tbl(x int)
RETURNS TABLE (p0 int, p1 int)
LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4);
UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val+1 WHERE id >= x;
UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val-1 WHERE id >= x;
RETURN QUERY
SELECT id, val
FROM multi_mx_function_call_delegation.mx_call_dist_table_1 t
WHERE id >= x
ORDER BY 1, 2;
END;$$;
-- before distribution ...
select mx_call_func_tbl(10);
-- after distribution ...
select create_distributed_function('mx_call_func_tbl(int)', '$1', 'mx_call_dist_table_1');
select mx_call_func_tbl(20);
-- Test that we properly propagate errors raised from procedures.
CREATE FUNCTION mx_call_func_raise(x int)
RETURNS void LANGUAGE plpgsql AS $$
BEGIN
RAISE WARNING 'warning';
RAISE EXCEPTION 'error';
END;$$;
select create_distributed_function('mx_call_func_raise(int)', '$1', 'mx_call_dist_table_1');
select mx_call_func_raise(2);
-- Test that we don't propagate to non-metadata worker nodes
select stop_metadata_sync_to_node('localhost', :worker_1_port);
select stop_metadata_sync_to_node('localhost', :worker_2_port);
select mx_call_func(2, 0);
SET client_min_messages TO NOTICE;
select start_metadata_sync_to_node('localhost', :worker_1_port);
select start_metadata_sync_to_node('localhost', :worker_2_port);
-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make
-- worker backend caches inconsistent. Reconnect to coordinator to use
-- new worker connections, hence new backends.
\c - - - :master_port
SET search_path to multi_mx_function_call_delegation, public;
SET client_min_messages TO DEBUG1;
--
-- Test non-const parameter values
--
CREATE FUNCTION mx_call_add(int, int) RETURNS int
AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE;
SELECT create_distributed_function('mx_call_add(int,int)', '$1');
-- subquery parameters cannot be pushed down
select mx_call_func((select x + 1 from mx_call_add(3, 4) x), 2);
-- volatile parameter cannot be pushed down
select mx_call_func(floor(random())::int, 2);
-- test forms we don't distribute
select * from mx_call_func(2, 0);
select mx_call_func(2, 0) from mx_call_dist_table_1;
select mx_call_func(2, 0) where mx_call_func(0, 2) = 0;
select mx_call_func(2, 0), mx_call_func(0, 2);
DO $$ BEGIN perform mx_call_func_tbl(40); END; $$;
SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val;
-- Prepared statements
PREPARE call_plan (int, int) AS SELECT mx_call_func($1, $2);
EXECUTE call_plan(2, 0);
RESET client_min_messages;
\set VERBOSITY terse
DROP SCHEMA multi_mx_function_call_delegation CASCADE;

View File

@ -140,3 +140,16 @@ BEGIN
EXECUTE p_sql;
PERFORM run_command_on_workers(p_sql);
END;$$;
-- 1. Marks the given procedure as colocated with the given table.
-- 2. Marks the argument index with which we route the procedure.
CREATE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
RETURNS void LANGUAGE plpgsql AS $$
BEGIN
update citus.pg_dist_object
set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid
from pg_proc, pg_dist_partition
where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
END;$$;