diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index f6911eb70..585456669 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -253,6 +253,31 @@ multi_ProcessUtility(PlannedStmt *pstmt, } #endif + if (IsA(parsetree, DoStmt)) + { + /* + * All statements in a DO block are executed in a single transaciton, + * so we need to keep track of whether we are inside a DO block. + */ + DoBlockLevel += 1; + + PG_TRY(); + { + standard_ProcessUtility(pstmt, queryString, context, + params, queryEnv, dest, completionTag); + + DoBlockLevel -= 1; + } + PG_CATCH(); + { + DoBlockLevel -= 1; + PG_RE_THROW(); + } + PG_END_TRY(); + + return; + } + /* process SET LOCAL stmts of whitelisted GUCs in multi-stmt xacts */ if (IsA(parsetree, VariableSetStmt)) { diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 460d399c1..a7d5c8215 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -87,6 +87,9 @@ bool EnableDeadlockPrevention = true; /* number of nested stored procedure call levels we are currently in */ int StoredProcedureLevel = 0; +/* number of nested DO block levels we are currently in */ +int DoBlockLevel = 0; + /* sort the returning to get consistent outputs */ bool SortReturning = false; diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index c5a8af5f0..10703055a 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -18,6 +18,7 @@ #include "catalog/pg_type.h" #include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodes.h" +#include "distributed/function_call_delegation.h" #include "distributed/insert_select_planner.h" #include "distributed/intermediate_results.h" #include "distributed/metadata_cache.h" @@ -192,6 +193,14 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) AdjustPartitioningForDistributedPlanning(rangeTableList, setPartitionedTablesInherited); } + else + { + DistributedPlan *delegatePlan = TryToDelegateFunctionCall(parse); + if (delegatePlan != NULL) + { + result = FinalizePlan(result, delegatePlan); + } + } } PG_CATCH(); { diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c new file mode 100644 index 000000000..b9bdbb253 --- /dev/null +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -0,0 +1,287 @@ +/*------------------------------------------------------------------------- + * + * function_call_delegation.c + * Planning logic for delegating a function call to a worker when the + * function was distributed with a distribution argument and the worker + * has metadata. + * + * Copyright (c), Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "catalog/pg_proc.h" +#include "catalog/pg_type.h" +#include "commands/defrem.h" +#include "distributed/citus_custom_scan.h" +#include "distributed/citus_ruleutils.h" +#include "distributed/colocation_utils.h" +#include "distributed/commands.h" +#include "distributed/commands/multi_copy.h" +#include "distributed/connection_management.h" +#include "distributed/function_call_delegation.h" +#include "distributed/master_metadata_utility.h" +#include "distributed/master_protocol.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_executor.h" +#include "distributed/multi_physical_planner.h" +#include "distributed/remote_commands.h" +#include "distributed/shard_pruning.h" +#include "distributed/version_compat.h" +#include "distributed/worker_manager.h" +#include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "nodes/parsenodes.h" +#include "nodes/primnodes.h" +#include "optimizer/clauses.h" +#include "parser/parse_coerce.h" +#if PG_VERSION_NUM >= 120000 +#include "parser/parsetree.h" +#endif +#include "miscadmin.h" +#include "tcop/dest.h" +#include "utils/lsyscache.h" +#include "utils/syscache.h" + +static bool contain_param_walker(Node *node, void *context); + +/* + * contain_param_walker scans node for Param nodes. + * returns whether any such nodes found. + */ +static bool +contain_param_walker(Node *node, void *context) +{ + return IsA(node, Param); +} + + +/* + * TryToDelegateFunctionCall calls a function on the worker if possible. + * We only support delegating the SELECT func(...) form for distributed + * functions colocated by distributed tables, and not more complicated + * forms involving multiple function calls, FROM clauses, WHERE clauses, + * ... Those complex forms are handled in the coordinator. + */ +DistributedPlan * +TryToDelegateFunctionCall(Query *query) +{ + FromExpr *joinTree = NULL; + List *targetList = NIL; + TargetEntry *targetEntry = NULL; + FuncExpr *funcExpr = NULL; + DistObjectCacheEntry *procedure = NULL; + Oid colocatedRelationId = InvalidOid; + Const *partitionValue = NULL; + Datum partitionValueDatum = 0; + ShardInterval *shardInterval = NULL; + List *placementList = NIL; + DistTableCacheEntry *distTable = NULL; + Var *partitionColumn = NULL; + ShardPlacement *placement = NULL; + WorkerNode *workerNode = NULL; + StringInfo queryString = NULL; + Task *task = NULL; + Job *job = NULL; + DistributedPlan *distributedPlan = NULL; + int32 groupId = 0; + + if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG4)) + { + /* Citus is not ready to determine whether function is distributed */ + return NULL; + } + + groupId = GetLocalGroupId(); + if (groupId != 0 || groupId == GROUP_ID_UPGRADING) + { + /* do not delegate from workers, or while upgrading */ + return NULL; + } + + if (query == NULL) + { + /* no query (mostly here to be defensive) */ + return NULL; + } + + joinTree = query->jointree; + if (joinTree == NULL) + { + /* no join tree (mostly here to be defensive) */ + return NULL; + } + + if (joinTree->quals != NULL) + { + /* query has a WHERE section */ + return NULL; + } + + if (joinTree->fromlist != NIL) + { + /* query has a FROM section */ +#if PG_VERSION_NUM >= 120000 + + /* in pg12 empty FROMs are represented with an RTE_RESULT */ + if (list_length(joinTree->fromlist) == 1) + { + RangeTblRef *reference = linitial(joinTree->fromlist); + RangeTblEntry *rtentry = rt_fetch(reference->rtindex, query->rtable); + if (rtentry->rtekind != RTE_RESULT) + { + return NULL; + } + } + else + { + return NULL; + } +#else + return NULL; +#endif + } + + targetList = query->targetList; + if (list_length(query->targetList) != 1) + { + /* multiple target list items */ + return NULL; + } + + targetEntry = (TargetEntry *) linitial(targetList); + if (!IsA(targetEntry->expr, FuncExpr)) + { + /* target list item is not a function call */ + return NULL; + } + + funcExpr = (FuncExpr *) targetEntry->expr; + procedure = LookupDistObjectCacheEntry(ProcedureRelationId, funcExpr->funcid, 0); + if (procedure == NULL) + { + /* not a distributed function call */ + ereport(DEBUG4, (errmsg("function is not distributed"))); + return NULL; + } + + if (IsMultiStatementTransaction()) + { + /* cannot delegate function calls in a multi-statement transaction */ + ereport(DEBUG1, (errmsg("not pushing down function calls in " + "a multi-statement transaction"))); + return NULL; + } + + if (procedure->distributionArgIndex < 0 || + procedure->distributionArgIndex >= list_length(funcExpr->args)) + { + ereport(DEBUG1, (errmsg("function call does not have a distribution argument"))); + return NULL; + } + + if (contain_volatile_functions((Node *) funcExpr->args)) + { + ereport(DEBUG1, (errmsg("arguments in a distributed function must " + "be constant expressions"))); + return NULL; + } + + if (expression_tree_walker((Node *) funcExpr->args, contain_param_walker, NULL)) + { + ereport(DEBUG1, (errmsg("arguments in a distributed function must " + "not contain subqueries"))); + return NULL; + } + + colocatedRelationId = ColocatedTableId(procedure->colocationId); + if (colocatedRelationId == InvalidOid) + { + ereport(DEBUG1, (errmsg("function does not have co-located tables"))); + return NULL; + } + + distTable = DistributedTableCacheEntry(colocatedRelationId); + partitionColumn = distTable->partitionColumn; + if (partitionColumn == NULL) + { + /* This can happen if colocated with a reference table. Punt for now. */ + ereport(DEBUG1, (errmsg( + "cannnot push down function call for reference tables"))); + return NULL; + } + + partitionValue = (Const *) list_nth(funcExpr->args, procedure->distributionArgIndex); + if (!IsA(partitionValue, Const)) + { + ereport(DEBUG1, (errmsg("distribution argument value must be a constant"))); + return NULL; + } + + partitionValueDatum = partitionValue->constvalue; + + if (partitionValue->consttype != partitionColumn->vartype) + { + CopyCoercionData coercionData; + + ConversionPathForTypes(partitionValue->consttype, partitionColumn->vartype, + &coercionData); + + partitionValueDatum = CoerceColumnValue(partitionValueDatum, &coercionData); + } + + shardInterval = FindShardInterval(partitionValueDatum, distTable); + if (shardInterval == NULL) + { + ereport(DEBUG1, (errmsg("cannot push down call, failed to find shard interval"))); + return NULL; + } + + placementList = FinalizedShardPlacementList(shardInterval->shardId); + if (list_length(placementList) != 1) + { + /* punt on this for now */ + ereport(DEBUG1, (errmsg( + "cannot push down function call for replicated distributed tables"))); + return NULL; + } + + placement = (ShardPlacement *) linitial(placementList); + workerNode = FindWorkerNode(placement->nodeName, placement->nodePort); + + if (workerNode == NULL || !workerNode->hasMetadata || !workerNode->metadataSynced) + { + ereport(DEBUG1, (errmsg("the worker node does not have metadata"))); + return NULL; + } + + ereport(DEBUG1, (errmsg("pushing down the function call"))); + + queryString = makeStringInfo(); + pg_get_query_def(query, queryString); + + task = CitusMakeNode(Task); + task->taskType = SQL_TASK; + task->queryString = queryString->data; + task->taskPlacementList = placementList; + task->anchorShardId = shardInterval->shardId; + task->replicationModel = distTable->replicationModel; + + job = CitusMakeNode(Job); + job->jobId = UniqueJobId(); + job->jobQuery = query; + job->taskList = list_make1(task); + + distributedPlan = CitusMakeNode(DistributedPlan); + distributedPlan->workerJob = job; + distributedPlan->masterQuery = NULL; + distributedPlan->routerExecutable = true; + distributedPlan->hasReturning = false; + + /* worker will take care of any necessary locking, treat query as read-only */ + distributedPlan->modLevel = ROW_MODIFY_READONLY; + + return distributedPlan; +} diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 9168b0cb7..ed0b94c03 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -638,6 +638,11 @@ IsMultiStatementTransaction(void) /* in a BEGIN...END block */ return true; } + else if (DoBlockLevel > 0) + { + /* in (a transaction within) a do block */ + return true; + } else if (StoredProcedureLevel > 0) { /* in (a transaction within) a stored procedure */ diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index fa637c5e7..e455575d3 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -3143,18 +3143,22 @@ GetLocalGroupId(void) tupleDescriptor, &isNull); groupId = DatumGetInt32(groupIdDatum); + + /* set the local cache variable */ + LocalGroupId = groupId; } else { - elog(ERROR, "could not find any entries in pg_dist_local_group"); + /* + * Upgrade is happening. When upgrading postgres, pg_dist_local_group is + * temporarily empty before citus_finish_pg_upgrade() finishes execution. + */ + groupId = GROUP_ID_UPGRADING; } systable_endscan(scanDescriptor); heap_close(pgDistLocalGroupId, AccessShareLock); - /* set the local cache variable */ - LocalGroupId = groupId; - return groupId; } diff --git a/src/include/distributed/function_call_delegation.h b/src/include/distributed/function_call_delegation.h new file mode 100644 index 000000000..830e5e9ea --- /dev/null +++ b/src/include/distributed/function_call_delegation.h @@ -0,0 +1,19 @@ +/* + * function_call_delegation.h + * Declarations for public functions and variables used to delegate + * function calls to worker nodes. + * + * Copyright (c), Citus Data, Inc. + */ + +#ifndef FUNCTION_CALL_DELEGATION_H +#define FUNCTION_CALL_DELEGATION_H + +#include "postgres.h" +#include "distributed/multi_physical_planner.h" + + +DistributedPlan * TryToDelegateFunctionCall(Query *query); + + +#endif /* FUNCTION_CALL_DELEGATION_H */ diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index f63d26816..77280aebf 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -29,6 +29,14 @@ typedef enum } ReadFromSecondariesType; extern int ReadFromSecondaries; + +/* + * While upgrading pg_dist_local_group can be empty temporarily, in that + * case we use GROUP_ID_UPGRADING as the local group id to communicate + * this to other functions. + */ +#define GROUP_ID_UPGRADING -2 + /* * Representation of a table's metadata that is frequently used for * distributed execution. Cached. diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index 70365a9d6..51645dd17 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -91,6 +91,9 @@ extern dlist_head InProgressTransactions; /* number of nested stored procedure call levels we are currently in */ extern int StoredProcedureLevel; +/* number of nested DO block levels we are currently in */ +extern int DoBlockLevel; + /* number of nested function call levels we are currently in */ extern int FunctionCallLevel; diff --git a/src/test/regress/expected/multi_mx_call.out b/src/test/regress/expected/multi_mx_call.out index 36209baf4..55d07c4a2 100644 --- a/src/test/regress/expected/multi_mx_call.out +++ b/src/test/regress/expected/multi_mx_call.out @@ -1,4 +1,6 @@ -- Test passing off CALL to mx workers +create schema multi_mx_call; +set search_path to multi_mx_call, public; -- Create worker-local tables to test procedure calls were routed set citus.shard_replication_factor to 2; set citus.replication_model to 'statement'; @@ -13,21 +15,6 @@ select create_distributed_table('mx_call_dist_table_replica', 'id'); insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5); set citus.shard_replication_factor to 1; set citus.replication_model to 'streaming'; -create schema multi_mx_call; -set search_path to multi_mx_call, public; --- --- Utility UDFs --- --- 1. Marks the given procedure as colocated with the given table. --- 2. Marks the argument index with which we route the procedure. -CREATE PROCEDURE colocate_proc_with_table(procname text, tablerelid regclass, argument_index int) -LANGUAGE plpgsql AS $$ -BEGIN - update citus.pg_dist_object - set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid - from pg_proc, pg_dist_partition - where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid; -END;$$; -- -- Create tables and procedures we want to use in tests -- @@ -96,6 +83,19 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A'); F | S (1 row) +-- Same for unqualified names +call mx_call_proc(2, 0); + y +---- + 29 +(1 row) + +call mx_call_proc_custom_types('S', 'A'); + x | y +---+--- + F | S +(1 row) + -- Mark both procedures as distributed ... select create_distributed_function('mx_call_proc(int,int)'); create_distributed_function @@ -114,10 +114,10 @@ select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_ca SET client_min_messages TO DEBUG1; call multi_mx_call.mx_call_proc(2, 0); DEBUG: stored procedure does not have co-located tables -DEBUG: generating subplan 8_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +DEBUG: generating subplan 10_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment -DEBUG: Plan 8 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment y @@ -133,8 +133,18 @@ DEBUG: stored procedure does not have co-located tables (1 row) -- Mark them as colocated with a table. Now we should route them to workers. -call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); -call multi_mx_call.colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1); +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + +select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + call multi_mx_call.mx_call_proc(2, 0); DEBUG: pushing down the procedure y @@ -149,14 +159,28 @@ DEBUG: pushing down the procedure S | S (1 row) +call mx_call_proc(2, 0); +DEBUG: pushing down the procedure + y +---- + 28 +(1 row) + +call mx_call_proc_custom_types('S', 'A'); +DEBUG: pushing down the procedure + x | y +---+--- + S | S +(1 row) + -- We don't allow distributing calls inside transactions begin; call multi_mx_call.mx_call_proc(2, 0); DEBUG: cannot push down CALL in multi-statement transaction -DEBUG: generating subplan 10_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +DEBUG: generating subplan 12_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment -DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +DEBUG: Plan 12 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment y @@ -179,21 +203,12 @@ DEBUG: stored procedure does not have co-located tables -- Make sure we do bounds checking on distributed argument index -- This also tests that we have cache invalidation for pg_dist_object updates -call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1); -call multi_mx_call.mx_call_proc(2, 0); -DEBUG: cannot push down invalid distribution_argument_index -DEBUG: generating subplan 12_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment -DEBUG: Plan 12 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment - y ----- - 29 +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1); + colocate_proc_with_table +-------------------------- + (1 row) -call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2); call multi_mx_call.mx_call_proc(2, 0); DEBUG: cannot push down invalid distribution_argument_index DEBUG: generating subplan 14_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) @@ -207,10 +222,14 @@ PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment 29 (1 row) --- We don't currently support colocating with reference tables -call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1); +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2); + colocate_proc_with_table +-------------------------- + +(1 row) + call multi_mx_call.mx_call_proc(2, 0); -DEBUG: cannot push down CALL for reference tables +DEBUG: cannot push down invalid distribution_argument_index DEBUG: generating subplan 17_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment @@ -222,10 +241,15 @@ PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment 29 (1 row) --- We don't currently support colocating with replicated tables -call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1); +-- We don't currently support colocating with reference tables +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + call multi_mx_call.mx_call_proc(2, 0); -DEBUG: cannot push down CALL for replicated distributed tables +DEBUG: cannot push down CALL for reference tables DEBUG: generating subplan 19_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment @@ -237,20 +261,45 @@ PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment 29 (1 row) +-- We don't currently support colocating with replicated tables +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + +call multi_mx_call.mx_call_proc(2, 0); +DEBUG: cannot push down CALL for replicated distributed tables +DEBUG: generating subplan 21_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment +DEBUG: Plan 21 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('21_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment + y +---- + 29 +(1 row) + SET client_min_messages TO NOTICE; drop table mx_call_dist_table_replica; SET client_min_messages TO DEBUG1; -call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + -- Test that we handle transactional constructs correctly inside a procedure -- that is routed to the workers. CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS $$ BEGIN - INSERT INTO mx_call_dist_table_1 VALUES (x, -1), (x+1, 4); + INSERT INTO multi_mx_call.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4); COMMIT; - UPDATE mx_call_dist_table_1 SET val = val+1 WHERE id >= x; + UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val+1 WHERE id >= x; ROLLBACK; -- Now do the final update! - UPDATE mx_call_dist_table_1 SET val = val-1 WHERE id >= x; + UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val-1 WHERE id >= x; END;$$; -- before distribution ... CALL multi_mx_call.mx_call_proc_tx(10); @@ -265,9 +314,6 @@ DETAIL: A distributed function is created. To make sure subsequent commands see CALL multi_mx_call.mx_call_proc_tx(20); DEBUG: pushing down the procedure -ERROR: relation "mx_call_dist_table_1" does not exist -CONTEXT: while executing command on localhost:57637 -PL/pgSQL function multi_mx_call.mx_call_proc_tx(integer) line 3 at SQL statement SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val; id | val ----+----- @@ -278,7 +324,9 @@ SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val; 9 | 2 10 | -2 11 | 3 -(7 rows) + 20 | -2 + 21 | 3 +(9 rows) -- Test that we properly propagate errors raised from procedures. CREATE PROCEDURE mx_call_proc_raise(x int) LANGUAGE plpgsql AS $$ @@ -316,10 +364,10 @@ select stop_metadata_sync_to_node('localhost', :worker_2_port); call multi_mx_call.mx_call_proc(2, 0); DEBUG: there is no worker node with metadata -DEBUG: generating subplan 25_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +DEBUG: generating subplan 27_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment -DEBUG: Plan 25 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('25_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +DEBUG: Plan 27 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('27_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment y @@ -340,6 +388,11 @@ select start_metadata_sync_to_node('localhost', :worker_2_port); (1 row) +-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make +-- worker backend caches inconsistent. Reconnect to coordinator to use +-- new worker connections, hence new backends. +\c - - - :master_port +SET search_path to multi_mx_call, public; SET client_min_messages TO DEBUG1; -- -- Test non-const parameter values @@ -357,10 +410,10 @@ DETAIL: A distributed function is created. To make sure subsequent commands see -- non-const distribution parameters cannot be pushed down call multi_mx_call.mx_call_proc(2, mx_call_add(3, 4)); DEBUG: distribution argument value must be a constant -DEBUG: generating subplan 27_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +DEBUG: generating subplan 1_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment -DEBUG: Plan 27 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('27_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +DEBUG: Plan 1 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('1_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment y @@ -379,10 +432,10 @@ DEBUG: pushing down the procedure -- volatile parameter cannot be pushed down call multi_mx_call.mx_call_proc(floor(random())::int, 2); DEBUG: arguments in a distributed stored procedure must be constant expressions -DEBUG: generating subplan 29_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +DEBUG: generating subplan 3_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment -DEBUG: Plan 29 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('29_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +DEBUG: Plan 3 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('3_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment y @@ -393,4 +446,4 @@ PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment reset client_min_messages; \set VERBOSITY terse drop schema multi_mx_call cascade; -NOTICE: drop cascades to 10 other objects +NOTICE: drop cascades to 9 other objects diff --git a/src/test/regress/expected/multi_mx_call_0.out b/src/test/regress/expected/multi_mx_call_0.out index a5dac9c72..f16f2d551 100644 --- a/src/test/regress/expected/multi_mx_call_0.out +++ b/src/test/regress/expected/multi_mx_call_0.out @@ -1,4 +1,6 @@ -- Test passing off CALL to mx workers +create schema multi_mx_call; +set search_path to multi_mx_call, public; -- Create worker-local tables to test procedure calls were routed set citus.shard_replication_factor to 2; set citus.replication_model to 'statement'; @@ -13,24 +15,6 @@ select create_distributed_table('mx_call_dist_table_replica', 'id'); insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5); set citus.shard_replication_factor to 1; set citus.replication_model to 'streaming'; -create schema multi_mx_call; -set search_path to multi_mx_call, public; --- --- Utility UDFs --- --- 1. Marks the given procedure as colocated with the given table. --- 2. Marks the argument index with which we route the procedure. -CREATE PROCEDURE colocate_proc_with_table(procname text, tablerelid regclass, argument_index int) -LANGUAGE plpgsql AS $$ -BEGIN - update citus.pg_dist_object - set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid - from pg_proc, pg_dist_partition - where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid; -END;$$; -ERROR: syntax error at or near "PROCEDURE" -LINE 1: CREATE PROCEDURE colocate_proc_with_table(procname text, tab... - ^ -- -- Create tables and procedures we want to use in tests -- @@ -101,6 +85,15 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A'); ERROR: syntax error at or near "call" LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A'); ^ +-- Same for unqualified names +call mx_call_proc(2, 0); +ERROR: syntax error at or near "call" +LINE 1: call mx_call_proc(2, 0); + ^ +call mx_call_proc_custom_types('S', 'A'); +ERROR: syntax error at or near "call" +LINE 1: call mx_call_proc_custom_types('S', 'A'); + ^ -- Mark both procedures as distributed ... select create_distributed_function('mx_call_proc(int,int)'); ERROR: function "mx_call_proc(int,int)" does not exist @@ -122,14 +115,18 @@ ERROR: syntax error at or near "call" LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A'); ^ -- Mark them as colocated with a table. Now we should route them to workers. -call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); -ERROR: syntax error at or near "call" -LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ... - ^ -call multi_mx_call.colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1); -ERROR: syntax error at or near "call" -LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc_cu... - ^ +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + +select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + call multi_mx_call.mx_call_proc(2, 0); ERROR: syntax error at or near "call" LINE 1: call multi_mx_call.mx_call_proc(2, 0); @@ -138,6 +135,14 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A'); ERROR: syntax error at or near "call" LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A'); ^ +call mx_call_proc(2, 0); +ERROR: syntax error at or near "call" +LINE 1: call mx_call_proc(2, 0); + ^ +call mx_call_proc_custom_types('S', 'A'); +ERROR: syntax error at or near "call" +LINE 1: call mx_call_proc_custom_types('S', 'A'); + ^ -- We don't allow distributing calls inside transactions begin; call multi_mx_call.mx_call_proc(2, 0); @@ -156,36 +161,44 @@ LINE 1: call multi_mx_call.mx_call_proc_custom_types('S', 'A'); ^ -- Make sure we do bounds checking on distributed argument index -- This also tests that we have cache invalidation for pg_dist_object updates -call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1); -ERROR: syntax error at or near "call" -LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ... - ^ +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1); + colocate_proc_with_table +-------------------------- + +(1 row) + call multi_mx_call.mx_call_proc(2, 0); ERROR: syntax error at or near "call" LINE 1: call multi_mx_call.mx_call_proc(2, 0); ^ -call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2); -ERROR: syntax error at or near "call" -LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ... - ^ +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2); + colocate_proc_with_table +-------------------------- + +(1 row) + call multi_mx_call.mx_call_proc(2, 0); ERROR: syntax error at or near "call" LINE 1: call multi_mx_call.mx_call_proc(2, 0); ^ -- We don't currently support colocating with reference tables -call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1); -ERROR: syntax error at or near "call" -LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ... - ^ +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + call multi_mx_call.mx_call_proc(2, 0); ERROR: syntax error at or near "call" LINE 1: call multi_mx_call.mx_call_proc(2, 0); ^ -- We don't currently support colocating with replicated tables -call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1); -ERROR: syntax error at or near "call" -LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ... - ^ +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + call multi_mx_call.mx_call_proc(2, 0); ERROR: syntax error at or near "call" LINE 1: call multi_mx_call.mx_call_proc(2, 0); @@ -193,20 +206,22 @@ LINE 1: call multi_mx_call.mx_call_proc(2, 0); SET client_min_messages TO NOTICE; drop table mx_call_dist_table_replica; SET client_min_messages TO DEBUG1; -call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); -ERROR: syntax error at or near "call" -LINE 1: call multi_mx_call.colocate_proc_with_table('mx_call_proc', ... - ^ +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + -- Test that we handle transactional constructs correctly inside a procedure -- that is routed to the workers. CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS $$ BEGIN - INSERT INTO mx_call_dist_table_1 VALUES (x, -1), (x+1, 4); + INSERT INTO multi_mx_call.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4); COMMIT; - UPDATE mx_call_dist_table_1 SET val = val+1 WHERE id >= x; + UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val+1 WHERE id >= x; ROLLBACK; -- Now do the final update! - UPDATE mx_call_dist_table_1 SET val = val-1 WHERE id >= x; + UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val-1 WHERE id >= x; END;$$; ERROR: syntax error at or near "PROCEDURE" LINE 1: CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS ... @@ -282,6 +297,11 @@ select start_metadata_sync_to_node('localhost', :worker_2_port); (1 row) +-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make +-- worker backend caches inconsistent. Reconnect to coordinator to use +-- new worker connections, hence new backends. +\c - - - :master_port +SET search_path to multi_mx_call, public; SET client_min_messages TO DEBUG1; -- -- Test non-const parameter values diff --git a/src/test/regress/expected/multi_mx_function_call_delegation.out b/src/test/regress/expected/multi_mx_function_call_delegation.out new file mode 100644 index 000000000..5c54674c3 --- /dev/null +++ b/src/test/regress/expected/multi_mx_function_call_delegation.out @@ -0,0 +1,539 @@ +-- Test passing off function call to mx workers +CREATE SCHEMA multi_mx_function_call_delegation; +SET search_path TO multi_mx_function_call_delegation, public; +SET citus.shard_replication_factor TO 2; +SET citus.replication_model TO 'statement'; +-- This table requires specific settings, create before getting into things +create table mx_call_dist_table_replica(id int, val int); +select create_distributed_table('mx_call_dist_table_replica', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5); +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; +-- +-- Create tables and functions we want to use in tests +-- +create table mx_call_dist_table_1(id int, val int); +select create_distributed_table('mx_call_dist_table_1', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +insert into mx_call_dist_table_1 values (3,1),(4,5),(9,2),(6,5),(3,5); +create table mx_call_dist_table_2(id int, val int); +select create_distributed_table('mx_call_dist_table_2', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +insert into mx_call_dist_table_2 values (1,1),(1,2),(2,2),(3,3),(3,4); +create table mx_call_dist_table_ref(id int, val int); +select create_reference_table('mx_call_dist_table_ref'); + create_reference_table +------------------------ + +(1 row) + +insert into mx_call_dist_table_ref values (2,7),(1,8),(2,8),(1,8),(2,8); +create type mx_call_enum as enum ('A', 'S', 'D', 'F'); +create table mx_call_dist_table_enum(id int, key mx_call_enum); +select create_distributed_table('mx_call_dist_table_enum', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F'); +CREATE FUNCTION squares(int) RETURNS SETOF RECORD + AS $$ SELECT i, i * i FROM generate_series(1, $1) i $$ + LANGUAGE SQL; +CREATE FUNCTION mx_call_func(x int, INOUT y int) +LANGUAGE plpgsql AS $$ +BEGIN + -- groupid is 0 in coordinator and non-zero in workers, so by using it here + -- we make sure the function is being executed in the worker. + y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group); + -- we also make sure that we can run distributed queries in the functions + -- that are routed to the workers. + y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id); +END;$$; +-- create another function which verifies: +-- 1. we work fine with multiple return columns +-- 2. we work fine in combination with custom types +CREATE FUNCTION mx_call_func_custom_types(INOUT x mx_call_enum, INOUT y mx_call_enum) +LANGUAGE plpgsql AS $$ +BEGIN + y := x; + x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); +END;$$; +-- Test that undistributed functions have no issue executing +select multi_mx_function_call_delegation.mx_call_func(2, 0); + mx_call_func +-------------- + 29 +(1 row) + +select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); + mx_call_func_custom_types +--------------------------- + (F,S) +(1 row) + +select squares(4); + squares +--------- + (1,1) + (2,4) + (3,9) + (4,16) +(4 rows) + +-- Same for unqualified name +select mx_call_func(2, 0); + mx_call_func +-------------- + 29 +(1 row) + +-- Mark both functions as distributed ... +select create_distributed_function('mx_call_func(int,int)'); + create_distributed_function +----------------------------- + +(1 row) + +select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_call_enum)'); + create_distributed_function +----------------------------- + +(1 row) + +select create_distributed_function('squares(int)'); + create_distributed_function +----------------------------- + +(1 row) + +-- We still don't route them to the workers, because they aren't +-- colocated with any distributed tables. +SET client_min_messages TO DEBUG1; +select mx_call_func(2, 0); +DEBUG: function does not have co-located tables +DEBUG: generating subplan 10_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- + 29 +(1 row) + +select mx_call_func_custom_types('S', 'A'); +DEBUG: function does not have co-located tables + mx_call_func_custom_types +--------------------------- + (F,S) +(1 row) + +-- Mark them as colocated with a table. Now we should route them to workers. +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + +select colocate_proc_with_table('mx_call_func_custom_types', 'mx_call_dist_table_enum'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + +select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0); + colocate_proc_with_table +-------------------------- + +(1 row) + +select mx_call_func(2, 0); +DEBUG: pushing down the function call + mx_call_func +-------------- + 28 +(1 row) + +select mx_call_func_custom_types('S', 'A'); +DEBUG: pushing down the function call + mx_call_func_custom_types +--------------------------- + (S,S) +(1 row) + +select squares(4); +DEBUG: pushing down the function call +ERROR: input of anonymous composite types is not implemented +select multi_mx_function_call_delegation.mx_call_func(2, 0); +DEBUG: pushing down the function call + mx_call_func +-------------- + 28 +(1 row) + +select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); +DEBUG: pushing down the function call + mx_call_func_custom_types +--------------------------- + (S,S) +(1 row) + +-- We don't allow distributing calls inside transactions +begin; +select mx_call_func(2, 0); +DEBUG: not pushing down function calls in a multi-statement transaction +DEBUG: generating subplan 12_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 12 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- + 29 +(1 row) + +commit; +-- Drop the table colocated with mx_call_func_custom_types. Now it shouldn't +-- be routed to workers anymore. +SET client_min_messages TO NOTICE; +drop table mx_call_dist_table_enum; +SET client_min_messages TO DEBUG1; +select mx_call_func_custom_types('S', 'A'); +DEBUG: function does not have co-located tables + mx_call_func_custom_types +--------------------------- + (F,S) +(1 row) + +-- Make sure we do bounds checking on distributed argument index +-- This also tests that we have cache invalidation for pg_dist_object updates +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, -1); + colocate_proc_with_table +-------------------------- + +(1 row) + +select mx_call_func(2, 0); +DEBUG: function call does not have a distribution argument +DEBUG: generating subplan 14_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 14 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('14_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- + 29 +(1 row) + +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 2); + colocate_proc_with_table +-------------------------- + +(1 row) + +select mx_call_func(2, 0); +DEBUG: function call does not have a distribution argument +DEBUG: generating subplan 17_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 17 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('17_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- + 29 +(1 row) + +-- We don't currently support colocating with reference tables +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_ref'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + +select mx_call_func(2, 0); +DEBUG: cannnot push down function call for reference tables +DEBUG: generating subplan 19_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 19 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('19_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- + 29 +(1 row) + +-- We don't currently support colocating with replicated tables +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_replica'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + +select mx_call_func(2, 0); +DEBUG: cannot push down function call for replicated distributed tables +DEBUG: generating subplan 21_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 21 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('21_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- + 29 +(1 row) + +SET client_min_messages TO NOTICE; +drop table mx_call_dist_table_replica; +SET client_min_messages TO DEBUG1; +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1); + colocate_proc_with_table +-------------------------- + +(1 row) + +-- Test table returning functions. +CREATE FUNCTION mx_call_func_tbl(x int) +RETURNS TABLE (p0 int, p1 int) +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4); + UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val+1 WHERE id >= x; + UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val-1 WHERE id >= x; + RETURN QUERY + SELECT id, val + FROM multi_mx_function_call_delegation.mx_call_dist_table_1 t + WHERE id >= x + ORDER BY 1, 2; +END;$$; +-- before distribution ... +select mx_call_func_tbl(10); + mx_call_func_tbl +------------------ + (10,-1) + (11,4) +(2 rows) + +-- after distribution ... +select create_distributed_function('mx_call_func_tbl(int)', '$1', 'mx_call_dist_table_1'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +----------------------------- + +(1 row) + +select mx_call_func_tbl(20); +DEBUG: pushing down the function call + mx_call_func_tbl +------------------ + (20,-1) + (21,4) +(2 rows) + +-- Test that we properly propagate errors raised from procedures. +CREATE FUNCTION mx_call_func_raise(x int) +RETURNS void LANGUAGE plpgsql AS $$ +BEGIN + RAISE WARNING 'warning'; + RAISE EXCEPTION 'error'; +END;$$; +select create_distributed_function('mx_call_func_raise(int)', '$1', 'mx_call_dist_table_1'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +----------------------------- + +(1 row) + +select mx_call_func_raise(2); +DEBUG: pushing down the function call +DEBUG: warning +DETAIL: WARNING from localhost:57638 +ERROR: error +CONTEXT: while executing command on localhost:57638 +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_raise(integer) line 4 at RAISE +-- Test that we don't propagate to non-metadata worker nodes +select stop_metadata_sync_to_node('localhost', :worker_1_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + +select stop_metadata_sync_to_node('localhost', :worker_2_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + +select mx_call_func(2, 0); +DEBUG: the worker node does not have metadata +DEBUG: generating subplan 27_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 27 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('27_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- + 29 +(1 row) + +SET client_min_messages TO NOTICE; +select start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +select start_metadata_sync_to_node('localhost', :worker_2_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make +-- worker backend caches inconsistent. Reconnect to coordinator to use +-- new worker connections, hence new backends. +\c - - - :master_port +SET search_path to multi_mx_function_call_delegation, public; +SET client_min_messages TO DEBUG1; +-- +-- Test non-const parameter values +-- +CREATE FUNCTION mx_call_add(int, int) RETURNS int + AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE; +SELECT create_distributed_function('mx_call_add(int,int)', '$1'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +----------------------------- + +(1 row) + +-- subquery parameters cannot be pushed down +select mx_call_func((select x + 1 from mx_call_add(3, 4) x), 2); +DEBUG: arguments in a distributed function must not contain subqueries +DEBUG: generating subplan 1_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 1 query after replacing subqueries and CTEs: SELECT (9 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('1_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- + 35 +(1 row) + +-- volatile parameter cannot be pushed down +select mx_call_func(floor(random())::int, 2); +DEBUG: arguments in a distributed function must be constant expressions +DEBUG: generating subplan 3_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 3 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('3_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- + 27 +(1 row) + +-- test forms we don't distribute +select * from mx_call_func(2, 0); +DEBUG: generating subplan 5_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 5 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('5_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + y +---- + 29 +(1 row) + +select mx_call_func(2, 0) from mx_call_dist_table_1; + mx_call_func +-------------- + 28 + 28 + 28 + 28 + 28 + 28 + 28 + 28 + 28 +(9 rows) + +select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; +DEBUG: generating subplan 8_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 8 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func +-------------- +(0 rows) + +select mx_call_func(2, 0), mx_call_func(0, 2); +DEBUG: generating subplan 10_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: generating subplan 13_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: Plan 13 query after replacing subqueries and CTEs: SELECT (1 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('13_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment + mx_call_func | mx_call_func +--------------+-------------- + 29 | 27 +(1 row) + +DO $$ BEGIN perform mx_call_func_tbl(40); END; $$; +DEBUG: not pushing down function calls in a multi-statement transaction +CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)" +PL/pgSQL function inline_code_block line 1 at PERFORM +SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val; + id | val +----+----- + 40 | -1 + 41 | 4 +(2 rows) + +-- Prepared statements +PREPARE call_plan (int, int) AS SELECT mx_call_func($1, $2); +EXECUTE call_plan(2, 0); +DEBUG: pushing down the function call + mx_call_func +-------------- + 28 +(1 row) + +RESET client_min_messages; +\set VERBOSITY terse +DROP SCHEMA multi_mx_function_call_delegation CASCADE; +NOTICE: drop cascades to 10 other objects diff --git a/src/test/regress/expected/multi_test_helpers.out b/src/test/regress/expected/multi_test_helpers.out index 240872882..799324e22 100644 --- a/src/test/regress/expected/multi_test_helpers.out +++ b/src/test/regress/expected/multi_test_helpers.out @@ -140,3 +140,13 @@ BEGIN EXECUTE p_sql; PERFORM run_command_on_workers(p_sql); END;$$; +-- 1. Marks the given procedure as colocated with the given table. +-- 2. Marks the argument index with which we route the procedure. +CREATE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int) +RETURNS void LANGUAGE plpgsql AS $$ +BEGIN + update citus.pg_dist_object + set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid + from pg_proc, pg_dist_partition + where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid; +END;$$; diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 6e19b1f57..eca2b9f26 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -33,6 +33,7 @@ test: multi_mx_repartition_udt_prepare mx_foreign_key_to_reference_table test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2 test: multi_mx_metadata test: multi_mx_call +test: multi_mx_function_call_delegation test: multi_mx_modifications local_shard_execution test: multi_mx_transaction_recovery test: multi_mx_modifying_xacts diff --git a/src/test/regress/sql/multi_mx_call.sql b/src/test/regress/sql/multi_mx_call.sql index c6caf7698..53883b63e 100644 --- a/src/test/regress/sql/multi_mx_call.sql +++ b/src/test/regress/sql/multi_mx_call.sql @@ -1,5 +1,8 @@ -- Test passing off CALL to mx workers +create schema multi_mx_call; +set search_path to multi_mx_call, public; + -- Create worker-local tables to test procedure calls were routed set citus.shard_replication_factor to 2; @@ -13,25 +16,6 @@ insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5); set citus.shard_replication_factor to 1; set citus.replication_model to 'streaming'; -create schema multi_mx_call; -set search_path to multi_mx_call, public; - --- --- Utility UDFs --- - --- 1. Marks the given procedure as colocated with the given table. --- 2. Marks the argument index with which we route the procedure. -CREATE PROCEDURE colocate_proc_with_table(procname text, tablerelid regclass, argument_index int) -LANGUAGE plpgsql AS $$ -BEGIN - update citus.pg_dist_object - set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid - from pg_proc, pg_dist_partition - where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid; -END;$$; - - -- -- Create tables and procedures we want to use in tests -- @@ -78,6 +62,10 @@ END;$$; call multi_mx_call.mx_call_proc(2, 0); call multi_mx_call.mx_call_proc_custom_types('S', 'A'); +-- Same for unqualified names +call mx_call_proc(2, 0); +call mx_call_proc_custom_types('S', 'A'); + -- Mark both procedures as distributed ... select create_distributed_function('mx_call_proc(int,int)'); select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_call_enum)'); @@ -89,10 +77,13 @@ call multi_mx_call.mx_call_proc(2, 0); call multi_mx_call.mx_call_proc_custom_types('S', 'A'); -- Mark them as colocated with a table. Now we should route them to workers. -call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); -call multi_mx_call.colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1); +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); +select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1); + call multi_mx_call.mx_call_proc(2, 0); call multi_mx_call.mx_call_proc_custom_types('S', 'A'); +call mx_call_proc(2, 0); +call mx_call_proc_custom_types('S', 'A'); -- We don't allow distributing calls inside transactions begin; @@ -108,34 +99,34 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A'); -- Make sure we do bounds checking on distributed argument index -- This also tests that we have cache invalidation for pg_dist_object updates -call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1); +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, -1); call multi_mx_call.mx_call_proc(2, 0); -call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2); +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2); call multi_mx_call.mx_call_proc(2, 0); -- We don't currently support colocating with reference tables -call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1); +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1); call multi_mx_call.mx_call_proc(2, 0); -- We don't currently support colocating with replicated tables -call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1); +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::regclass, 1); call multi_mx_call.mx_call_proc(2, 0); SET client_min_messages TO NOTICE; drop table mx_call_dist_table_replica; SET client_min_messages TO DEBUG1; -call multi_mx_call.colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); -- Test that we handle transactional constructs correctly inside a procedure -- that is routed to the workers. CREATE PROCEDURE mx_call_proc_tx(x int) LANGUAGE plpgsql AS $$ BEGIN - INSERT INTO mx_call_dist_table_1 VALUES (x, -1), (x+1, 4); + INSERT INTO multi_mx_call.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4); COMMIT; - UPDATE mx_call_dist_table_1 SET val = val+1 WHERE id >= x; + UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val+1 WHERE id >= x; ROLLBACK; -- Now do the final update! - UPDATE mx_call_dist_table_1 SET val = val-1 WHERE id >= x; + UPDATE multi_mx_call.mx_call_dist_table_1 SET val = val-1 WHERE id >= x; END;$$; -- before distribution ... @@ -162,6 +153,12 @@ call multi_mx_call.mx_call_proc(2, 0); SET client_min_messages TO NOTICE; select start_metadata_sync_to_node('localhost', :worker_1_port); select start_metadata_sync_to_node('localhost', :worker_2_port); + +-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make +-- worker backend caches inconsistent. Reconnect to coordinator to use +-- new worker connections, hence new backends. +\c - - - :master_port +SET search_path to multi_mx_call, public; SET client_min_messages TO DEBUG1; -- diff --git a/src/test/regress/sql/multi_mx_function_call_delegation.sql b/src/test/regress/sql/multi_mx_function_call_delegation.sql new file mode 100644 index 000000000..ddf92f9bf --- /dev/null +++ b/src/test/regress/sql/multi_mx_function_call_delegation.sql @@ -0,0 +1,198 @@ +-- Test passing off function call to mx workers + +CREATE SCHEMA multi_mx_function_call_delegation; +SET search_path TO multi_mx_function_call_delegation, public; + +SET citus.shard_replication_factor TO 2; +SET citus.replication_model TO 'statement'; + +-- This table requires specific settings, create before getting into things +create table mx_call_dist_table_replica(id int, val int); +select create_distributed_table('mx_call_dist_table_replica', 'id'); +insert into mx_call_dist_table_replica values (9,1),(8,2),(7,3),(6,4),(5,5); + +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; + +-- +-- Create tables and functions we want to use in tests +-- +create table mx_call_dist_table_1(id int, val int); +select create_distributed_table('mx_call_dist_table_1', 'id'); +insert into mx_call_dist_table_1 values (3,1),(4,5),(9,2),(6,5),(3,5); + +create table mx_call_dist_table_2(id int, val int); +select create_distributed_table('mx_call_dist_table_2', 'id'); +insert into mx_call_dist_table_2 values (1,1),(1,2),(2,2),(3,3),(3,4); + +create table mx_call_dist_table_ref(id int, val int); +select create_reference_table('mx_call_dist_table_ref'); +insert into mx_call_dist_table_ref values (2,7),(1,8),(2,8),(1,8),(2,8); + +create type mx_call_enum as enum ('A', 'S', 'D', 'F'); +create table mx_call_dist_table_enum(id int, key mx_call_enum); +select create_distributed_table('mx_call_dist_table_enum', 'key'); +insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F'); + +CREATE FUNCTION squares(int) RETURNS SETOF RECORD + AS $$ SELECT i, i * i FROM generate_series(1, $1) i $$ + LANGUAGE SQL; + +CREATE FUNCTION mx_call_func(x int, INOUT y int) +LANGUAGE plpgsql AS $$ +BEGIN + -- groupid is 0 in coordinator and non-zero in workers, so by using it here + -- we make sure the function is being executed in the worker. + y := x + (select case groupid when 0 then 1 else 0 end from pg_dist_local_group); + -- we also make sure that we can run distributed queries in the functions + -- that are routed to the workers. + y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id); +END;$$; + +-- create another function which verifies: +-- 1. we work fine with multiple return columns +-- 2. we work fine in combination with custom types +CREATE FUNCTION mx_call_func_custom_types(INOUT x mx_call_enum, INOUT y mx_call_enum) +LANGUAGE plpgsql AS $$ +BEGIN + y := x; + x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); +END;$$; + +-- Test that undistributed functions have no issue executing +select multi_mx_function_call_delegation.mx_call_func(2, 0); +select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); +select squares(4); + +-- Same for unqualified name +select mx_call_func(2, 0); + +-- Mark both functions as distributed ... +select create_distributed_function('mx_call_func(int,int)'); +select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_call_enum)'); +select create_distributed_function('squares(int)'); + +-- We still don't route them to the workers, because they aren't +-- colocated with any distributed tables. +SET client_min_messages TO DEBUG1; +select mx_call_func(2, 0); +select mx_call_func_custom_types('S', 'A'); + +-- Mark them as colocated with a table. Now we should route them to workers. +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1); +select colocate_proc_with_table('mx_call_func_custom_types', 'mx_call_dist_table_enum'::regclass, 1); +select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0); + +select mx_call_func(2, 0); +select mx_call_func_custom_types('S', 'A'); +select squares(4); +select multi_mx_function_call_delegation.mx_call_func(2, 0); +select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); + +-- We don't allow distributing calls inside transactions +begin; +select mx_call_func(2, 0); +commit; + +-- Drop the table colocated with mx_call_func_custom_types. Now it shouldn't +-- be routed to workers anymore. +SET client_min_messages TO NOTICE; +drop table mx_call_dist_table_enum; +SET client_min_messages TO DEBUG1; +select mx_call_func_custom_types('S', 'A'); + +-- Make sure we do bounds checking on distributed argument index +-- This also tests that we have cache invalidation for pg_dist_object updates +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, -1); +select mx_call_func(2, 0); +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 2); +select mx_call_func(2, 0); + +-- We don't currently support colocating with reference tables +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_ref'::regclass, 1); +select mx_call_func(2, 0); + +-- We don't currently support colocating with replicated tables +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_replica'::regclass, 1); +select mx_call_func(2, 0); +SET client_min_messages TO NOTICE; +drop table mx_call_dist_table_replica; +SET client_min_messages TO DEBUG1; + +select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass, 1); + +-- Test table returning functions. +CREATE FUNCTION mx_call_func_tbl(x int) +RETURNS TABLE (p0 int, p1 int) +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 VALUES (x, -1), (x+1, 4); + UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val+1 WHERE id >= x; + UPDATE multi_mx_function_call_delegation.mx_call_dist_table_1 SET val = val-1 WHERE id >= x; + RETURN QUERY + SELECT id, val + FROM multi_mx_function_call_delegation.mx_call_dist_table_1 t + WHERE id >= x + ORDER BY 1, 2; +END;$$; + +-- before distribution ... +select mx_call_func_tbl(10); +-- after distribution ... +select create_distributed_function('mx_call_func_tbl(int)', '$1', 'mx_call_dist_table_1'); +select mx_call_func_tbl(20); + +-- Test that we properly propagate errors raised from procedures. +CREATE FUNCTION mx_call_func_raise(x int) +RETURNS void LANGUAGE plpgsql AS $$ +BEGIN + RAISE WARNING 'warning'; + RAISE EXCEPTION 'error'; +END;$$; +select create_distributed_function('mx_call_func_raise(int)', '$1', 'mx_call_dist_table_1'); +select mx_call_func_raise(2); + +-- Test that we don't propagate to non-metadata worker nodes +select stop_metadata_sync_to_node('localhost', :worker_1_port); +select stop_metadata_sync_to_node('localhost', :worker_2_port); +select mx_call_func(2, 0); +SET client_min_messages TO NOTICE; +select start_metadata_sync_to_node('localhost', :worker_1_port); +select start_metadata_sync_to_node('localhost', :worker_2_port); + +-- stop_metadata_sync_to_node()/start_metadata_sync_to_node() might make +-- worker backend caches inconsistent. Reconnect to coordinator to use +-- new worker connections, hence new backends. +\c - - - :master_port +SET search_path to multi_mx_function_call_delegation, public; +SET client_min_messages TO DEBUG1; + +-- +-- Test non-const parameter values +-- +CREATE FUNCTION mx_call_add(int, int) RETURNS int + AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE; +SELECT create_distributed_function('mx_call_add(int,int)', '$1'); + +-- subquery parameters cannot be pushed down +select mx_call_func((select x + 1 from mx_call_add(3, 4) x), 2); + +-- volatile parameter cannot be pushed down +select mx_call_func(floor(random())::int, 2); + +-- test forms we don't distribute +select * from mx_call_func(2, 0); +select mx_call_func(2, 0) from mx_call_dist_table_1; +select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; +select mx_call_func(2, 0), mx_call_func(0, 2); + +DO $$ BEGIN perform mx_call_func_tbl(40); END; $$; +SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val; + +-- Prepared statements +PREPARE call_plan (int, int) AS SELECT mx_call_func($1, $2); +EXECUTE call_plan(2, 0); + +RESET client_min_messages; +\set VERBOSITY terse +DROP SCHEMA multi_mx_function_call_delegation CASCADE; diff --git a/src/test/regress/sql/multi_test_helpers.sql b/src/test/regress/sql/multi_test_helpers.sql index 7582f1ee1..0e853971d 100644 --- a/src/test/regress/sql/multi_test_helpers.sql +++ b/src/test/regress/sql/multi_test_helpers.sql @@ -140,3 +140,16 @@ BEGIN EXECUTE p_sql; PERFORM run_command_on_workers(p_sql); END;$$; + +-- 1. Marks the given procedure as colocated with the given table. +-- 2. Marks the argument index with which we route the procedure. +CREATE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int) +RETURNS void LANGUAGE plpgsql AS $$ +BEGIN + update citus.pg_dist_object + set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid + from pg_proc, pg_dist_partition + where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid; +END;$$; + +