Merge pull request #5602 from citusdata/marcocitus/disallow-remote-execution

pull/5520/head^2
Marco Slot 2022-01-07 18:02:13 +01:00 committed by GitHub
commit 73a76b876a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 531 additions and 117 deletions

View File

@ -24,6 +24,7 @@
#include "distributed/commands/utility_hook.h"
#include "distributed/connection_management.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/function_call_delegation.h"
#include "distributed/metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
@ -46,9 +47,10 @@
#include "utils/lsyscache.h"
#include "utils/syscache.h"
static bool CallFuncExprRemotely(CallStmt *callStmt,
DistObjectCacheEntry *procedure,
FuncExpr *funcExpr, DestReceiver *dest);
/* global variable tracking whether we are in a delegated procedure call */
bool InDelegatedProcedureCall = false;
/*
* CallDistributedProcedureRemotely calls a stored procedure on the worker if possible.
@ -61,28 +63,21 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest)
DistObjectCacheEntry *procedure = LookupDistObjectCacheEntry(ProcedureRelationId,
functionId, 0);
/*
* If procedure is not distributed or already delegated from another
* node, do not call the procedure remotely.
*/
if (procedure == NULL || !procedure->isDistributed ||
IsCitusInitiatedRemoteBackend())
if (procedure == NULL || !procedure->isDistributed)
{
return false;
}
return CallFuncExprRemotely(callStmt, procedure, funcExpr, dest);
}
if (IsCitusInitiatedRemoteBackend())
{
/*
* We are in a citus-initiated backend handling a CALL to a distributed
* procedure. That means that this is the delegated call.
*/
InDelegatedProcedureCall = true;
return false;
}
/*
* CallFuncExprRemotely calls a procedure of function on the worker if possible.
*/
static bool
CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
FuncExpr *funcExpr, DestReceiver *dest)
{
if (IsMultiStatementTransaction())
{
ereport(DEBUG1, (errmsg("cannot push down CALL in multi-statement transaction")));
@ -102,6 +97,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
"be constant expressions")));
return false;
}
CitusTableCacheEntry *distTable = GetCitusTableCacheEntry(colocatedRelationId);
Var *partitionColumn = distTable->partitionColumn;
bool colocatedWithReferenceTable = false;

View File

@ -3340,6 +3340,7 @@ InitializeCopyShardState(CopyShardState *shardState,
{
ListCell *placementCell = NULL;
int failedPlacementCount = 0;
bool hasRemoteCopy = false;
MemoryContext localContext =
AllocSetContextCreateExtended(CurrentMemoryContext,
@ -3383,6 +3384,8 @@ InitializeCopyShardState(CopyShardState *shardState,
continue;
}
hasRemoteCopy = true;
MultiConnection *connection =
CopyGetPlacementConnection(connectionStateHash, placement,
colocatedIntermediateResult);
@ -3427,6 +3430,11 @@ InitializeCopyShardState(CopyShardState *shardState,
ereport(ERROR, (errmsg("could not connect to any active placements")));
}
if (hasRemoteCopy)
{
EnsureRemoteTaskExecutionAllowed();
}
/*
* We just error out and code execution should never reach to this
* point. This is the case for all tables.

View File

@ -227,10 +227,20 @@ multi_ProcessUtility(PlannedStmt *pstmt,
params, queryEnv, dest, completionTag);
StoredProcedureLevel -= 1;
if (InDelegatedProcedureCall && StoredProcedureLevel == 0)
{
InDelegatedProcedureCall = false;
}
}
PG_CATCH();
{
StoredProcedureLevel -= 1;
if (InDelegatedProcedureCall && StoredProcedureLevel == 0)
{
InDelegatedProcedureCall = false;
}
PG_RE_THROW();
}
PG_END_TRY();

View File

@ -1290,6 +1290,12 @@ StartDistributedExecution(DistributedExecution *execution)
*/
RecordParallelRelationAccessForTaskList(execution->remoteAndLocalTaskList);
}
/* make sure we are not doing remote execution from within a task */
if (execution->remoteTaskList != NIL)
{
EnsureRemoteTaskExecutionAllowed();
}
}

View File

@ -21,9 +21,11 @@
#include "distributed/citus_custom_scan.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/function_call_delegation.h"
#include "distributed/insert_select_executor.h"
#include "distributed/insert_select_planner.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/multi_executor.h"
#include "distributed/combine_query_planner.h"
@ -719,3 +721,46 @@ ExecutorBoundParams(void)
Assert(ExecutorLevel > 0);
return executorBoundParams;
}
/*
* EnsureRemoteTaskExecutionAllowed ensures that we do not perform remote
* execution from within a task. That could happen when the user calls
* a function in a query that gets pushed down to the worker, and the
* function performs a query on a distributed table.
*/
void
EnsureRemoteTaskExecutionAllowed(void)
{
if (!InTaskExecution())
{
/* we are not within a task, distributed execution is allowed */
return;
}
ereport(ERROR, (errmsg("cannot execute a distributed query from a query on a "
"shard")));
}
/*
* InTaskExecution determines whether we are currently in a task execution.
*/
bool
InTaskExecution(void)
{
if (LocalExecutorLevel > 0)
{
/* in a local task */
return true;
}
/*
* Normally, any query execution within a citus-initiated backend
* is considered a task execution, but an exception is when we
* are in a delegated function/procedure call.
*/
return IsCitusInitiatedRemoteBackend() &&
!InDelegatedFunctionCall &&
!InDelegatedProcedureCall;
}

View File

@ -58,6 +58,10 @@ struct ParamWalkerContext
static bool contain_param_walker(Node *node, void *context);
/* global variable keeping track of whether we are in a delegated function call */
bool InDelegatedFunctionCall = false;
/*
* contain_param_walker scans node for Param nodes.
* Ignore the return value, instead check context afterwards.
@ -112,15 +116,6 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
}
int32 localGroupId = GetLocalGroupId();
if (localGroupId != COORDINATOR_GROUP_ID && IsCitusInitiatedRemoteBackend())
{
/*
* Do not delegate from workers if it is initiated by Citus already.
* It means that this function has already been delegated to this node.
*/
return NULL;
}
if (localGroupId == GROUP_ID_UPGRADING)
{
/* do not delegate while upgrading */
@ -218,6 +213,27 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
ereport(DEBUG4, (errmsg("function is distributed")));
}
if (IsCitusInitiatedRemoteBackend())
{
/*
* We are planning a call to a distributed function within a Citus backend,
* that means that this is the delegated call.
*/
InDelegatedFunctionCall = true;
return NULL;
}
if (localGroupId != COORDINATOR_GROUP_ID)
{
/*
* We are calling a distributed function on a worker node. We currently
* only delegate from the coordinator.
*
* TODO: remove this restriction.
*/
return NULL;
}
/*
* Cannot delegate functions for INSERT ... SELECT func(), since they require
* coordinated transactions.

View File

@ -113,6 +113,12 @@ start_session_level_connection_to_node(PG_FUNCTION_ARGS)
elog(ERROR, "failed to connect to %s:%d", nodeNameString, (int) nodePort);
}
/* pretend we are a regular client to avoid citus-initiated backend checks */
const char *setAppName =
"SET application_name TO run_commands_on_session_level_connection_to_node";
ExecuteCriticalRemoteCommand(singleConnection, setAppName);
PG_RETURN_VOID();
}

View File

@ -23,6 +23,7 @@
#include "distributed/citus_safe_lib.h"
#include "distributed/connection_management.h"
#include "distributed/distributed_planner.h"
#include "distributed/function_call_delegation.h"
#include "distributed/hash_helpers.h"
#include "distributed/intermediate_results.h"
#include "distributed/listutils.h"
@ -550,6 +551,7 @@ ResetGlobalVariables()
ShouldCoordinatedTransactionUse2PC = false;
TransactionModifiedNodeMetadata = false;
MetadataSyncOnCommit = false;
InDelegatedFunctionCall = false;
ResetWorkerErrorIndication();
}

View File

@ -18,6 +18,7 @@
#include "tcop/utility.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/function_call_delegation.h"
#include "distributed/version_compat.h"
#include "distributed/worker_transaction.h"
@ -37,6 +38,7 @@ extern bool EnableAlterRolePropagation;
extern bool EnableAlterRoleSetPropagation;
extern bool EnableAlterDatabaseOwner;
extern int UtilityHookLevel;
extern bool InDelegatedProcedureCall;
/*

View File

@ -15,6 +15,14 @@
#include "distributed/multi_physical_planner.h"
/*
* These flags keep track of whether the process is currently in a delegated
* function or procedure call.
*/
extern bool InDelegatedFunctionCall;
extern bool InDelegatedProcedureCall;
PlannedStmt * TryToDelegateFunctionCall(DistributedPlanningContext *planContext);

View File

@ -149,6 +149,8 @@ extern void ExtractParametersFromParamList(ParamListInfo paramListInfo,
const char ***parameterValues, bool
useOriginalCustomTypeOids);
extern ParamListInfo ExecutorBoundParams(void);
extern void EnsureRemoteTaskExecutionAllowed(void);
extern bool InTaskExecution(void);
#endif /* MULTI_EXECUTOR_H */

View File

@ -392,6 +392,13 @@ BEGIN
RETURN NEW;
END;
$insert_100$ LANGUAGE plpgsql;
CREATE TABLE local_table (value int);
CREATE FUNCTION insert_100_local() RETURNS trigger AS $insert_100$
BEGIN
INSERT INTO local_table VALUES (100);
RETURN NEW;
END;
$insert_100$ LANGUAGE plpgsql;
BEGIN;
CREATE TRIGGER insert_100_trigger
AFTER TRUNCATE ON another_citus_local_table
@ -416,6 +423,7 @@ NOTICE: executing the command locally: SELECT value FROM citus_local_table_trig
(2 rows)
ROLLBACK;
-- cannot perform remote execution from a trigger on a Citus local table
BEGIN;
-- update should actually update something to test ON UPDATE CASCADE logic
INSERT INTO another_citus_local_table VALUES (600);
@ -436,11 +444,70 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1
FOR EACH STATEMENT EXECUTE FUNCTION insert_100();')
UPDATE another_citus_local_table SET value=value-1;;
NOTICE: executing the command locally: UPDATE citus_local_table_triggers.another_citus_local_table_1507009 another_citus_local_table SET value = (value OPERATOR(pg_catalog.-) 1)
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.reference_table_1507010 (value) VALUES (100)
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.reference_table_1507010 (value) VALUES (100)
ERROR: cannot execute a distributed query from a query on a shard
ROLLBACK;
-- can perform regular execution from a trigger on a Citus local table
BEGIN;
-- update should actually update something to test ON UPDATE CASCADE logic
INSERT INTO another_citus_local_table VALUES (600);
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.another_citus_local_table_1507009 (value) VALUES (600)
INSERT INTO citus_local_table VALUES (600);
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.citus_local_table_1507001 (value) VALUES (600)
CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON another_citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507009, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON another_citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();')
CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507001, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();')
UPDATE another_citus_local_table SET value=value-1;;
NOTICE: executing the command locally: UPDATE citus_local_table_triggers.another_citus_local_table_1507009 another_citus_local_table SET value = (value OPERATOR(pg_catalog.-) 1)
-- we should see two rows with "100"
SELECT * FROM reference_table;
NOTICE: executing the command locally: SELECT value FROM citus_local_table_triggers.reference_table_1507010 reference_table
SELECT * FROM local_table;
value
---------------------------------------------------------------------
100
100
(2 rows)
ROLLBACK;
-- can perform local execution from a trigger on a Citus local table
BEGIN;
SELECT citus_add_local_table_to_metadata('local_table');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
-- update should actually update something to test ON UPDATE CASCADE logic
INSERT INTO another_citus_local_table VALUES (600);
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.another_citus_local_table_1507009 (value) VALUES (600)
INSERT INTO citus_local_table VALUES (600);
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.citus_local_table_1507001 (value) VALUES (600)
CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON another_citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507009, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON another_citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();')
CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507001, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();')
UPDATE another_citus_local_table SET value=value-1;;
NOTICE: executing the command locally: UPDATE citus_local_table_triggers.another_citus_local_table_1507009 another_citus_local_table SET value = (value OPERATOR(pg_catalog.-) 1)
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.local_table_1507011 (value) VALUES (100)
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.local_table_1507011 (value) VALUES (100)
-- we should see two rows with "100"
SELECT * FROM local_table;
NOTICE: executing the command locally: SELECT value FROM citus_local_table_triggers.local_table_1507011 local_table
value
---------------------------------------------------------------------
100
@ -456,11 +523,11 @@ CREATE TABLE par_another_citus_local_table_1 PARTITION OF par_another_citus_loca
ALTER TABLE par_another_citus_local_table ADD CONSTRAINT fkey_self FOREIGN KEY(val) REFERENCES par_another_citus_local_table(val);
ALTER TABLE par_citus_local_table ADD CONSTRAINT fkey_c_to_c FOREIGN KEY(val) REFERENCES par_another_citus_local_table(val) ON UPDATE CASCADE;
SELECT citus_add_local_table_to_metadata('par_another_citus_local_table', cascade_via_foreign_keys=>true);
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507011, 'citus_local_table_triggers', 1507012, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_another_citus_local_table_1 FOR VALUES FROM (1) TO (10000);')
NOTICE: executing the command locally: SELECT pg_catalog.citus_run_local_command($$SELECT worker_fix_partition_shard_index_names('citus_local_table_triggers.par_another_citus_local_table_val_key_1507011'::regclass, 'citus_local_table_triggers.par_another_citus_local_table_1_1507012', 'par_another_citus_local_table_1_val_key_1507012')$$)
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507013, 'citus_local_table_triggers', 1507014, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_citus_local_table_1 FOR VALUES FROM (1) TO (10000);')
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507011, 'citus_local_table_triggers', 1507011, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ADD CONSTRAINT fkey_self FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val)')
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507013, 'citus_local_table_triggers', 1507011, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ADD CONSTRAINT fkey_c_to_c FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val) ON UPDATE CASCADE')
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507012, 'citus_local_table_triggers', 1507013, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_another_citus_local_table_1 FOR VALUES FROM (1) TO (10000);')
NOTICE: executing the command locally: SELECT pg_catalog.citus_run_local_command($$SELECT worker_fix_partition_shard_index_names('citus_local_table_triggers.par_another_citus_local_table_val_key_1507012'::regclass, 'citus_local_table_triggers.par_another_citus_local_table_1_1507013', 'par_another_citus_local_table_1_val_key_1507013')$$)
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507014, 'citus_local_table_triggers', 1507015, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_citus_local_table_1 FOR VALUES FROM (1) TO (10000);')
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507012, 'citus_local_table_triggers', 1507012, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ADD CONSTRAINT fkey_self FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val)')
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507014, 'citus_local_table_triggers', 1507012, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ADD CONSTRAINT fkey_c_to_c FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val) ON UPDATE CASCADE')
citus_add_local_table_to_metadata
---------------------------------------------------------------------
@ -489,7 +556,7 @@ BEGIN;
TRUNCATE par_another_citus_local_table CASCADE;
NOTICE: truncate cascades to table "par_citus_local_table"
NOTICE: truncate cascades to table "par_citus_local_table_1"
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507015 (val) VALUES (100)
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507016 (val) VALUES (100)
NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_triggers.par_another_citus_local_table_xxxxx CASCADE
NOTICE: truncate cascades to table "par_citus_local_table_xxxxx"
NOTICE: truncate cascades to table "par_citus_local_table_1_xxxxx"
@ -497,12 +564,12 @@ NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_trigger
NOTICE: truncate cascades to table "par_citus_local_table_xxxxx"
NOTICE: truncate cascades to table "par_citus_local_table_1_xxxxx"
NOTICE: truncate cascades to table "par_another_citus_local_table_xxxxx"
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507015 (val) VALUES (100)
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507016 (val) VALUES (100)
NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_triggers.par_citus_local_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_triggers.par_citus_local_table_1_xxxxx CASCADE
-- we should see two rows with "100"
SELECT * FROM par_reference_table;
NOTICE: executing the command locally: SELECT val FROM citus_local_table_triggers.par_reference_table_1507015 par_reference_table
NOTICE: executing the command locally: SELECT val FROM citus_local_table_triggers.par_reference_table_1507016 par_reference_table
val
---------------------------------------------------------------------
100
@ -512,4 +579,4 @@ NOTICE: executing the command locally: SELECT val FROM citus_local_table_trigge
ROLLBACK;
-- cleanup at exit
DROP SCHEMA citus_local_table_triggers, "interesting!schema" CASCADE;
NOTICE: drop cascades to 20 other objects
NOTICE: drop cascades to 22 other objects

View File

@ -66,16 +66,10 @@ ALTER FOREIGN TABLE public.foreign_table_newname ADD CONSTRAINT check_c_2 check(
ALTER FOREIGN TABLE public.foreign_table_newname VALIDATE CONSTRAINT check_c_2;
ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint IF EXISTS check_c_2;
-- trigger test
CREATE TABLE distributed_table(value int);
SELECT create_distributed_table('distributed_table', 'value');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table42(value int);
CREATE FUNCTION insert_42() RETURNS trigger AS $insert_42$
BEGIN
INSERT INTO distributed_table VALUES (42);
INSERT INTO table42 VALUES (42);
RETURN NEW;
END;
$insert_42$ LANGUAGE plpgsql;
@ -85,7 +79,7 @@ FOR EACH ROW EXECUTE FUNCTION insert_42();
-- do the same pattern from the workers as well
INSERT INTO public.foreign_table_newname VALUES (99, 'test_2');
delete from public.foreign_table_newname where id_test = 99;
select * from distributed_table ORDER BY value;
select * from table42 ORDER BY value;
value
---------------------------------------------------------------------
42
@ -96,7 +90,7 @@ alter foreign table public.foreign_table_newname disable trigger insert_42_trigg
INSERT INTO public.foreign_table_newname VALUES (99, 'test_2');
delete from public.foreign_table_newname where id_test = 99;
-- should not insert again as trigger disabled
select * from distributed_table ORDER BY value;
select * from table42 ORDER BY value;
value
---------------------------------------------------------------------
42

View File

@ -131,6 +131,12 @@ BEGIN
y := x;
x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
END;$$;
CREATE PROCEDURE mx_call_proc_copy(x int)
LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO multi_mx_call.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s;
END;$$;
-- Test that undistributed procedures have no issue executing
call multi_mx_call.mx_call_proc(2, 0);
y
@ -144,6 +150,7 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A');
F | S
(1 row)
call multi_mx_call.mx_call_proc_copy(2);
-- Same for unqualified names
call mx_call_proc(2, 0);
y
@ -176,6 +183,12 @@ select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_ca
(1 row)
select create_distributed_function('mx_call_proc_copy(int)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
-- We still don't route them to the workers, because they aren't
-- colocated with any distributed tables.
SET client_min_messages TO DEBUG1;
@ -206,6 +219,12 @@ DEBUG: stored procedure does not have co-located tables
F | S
(1 row)
call multi_mx_call.mx_call_proc_copy(2);
DEBUG: stored procedure does not have co-located tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
CONTEXT: SQL statement "INSERT INTO multi_mx_call.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s"
PL/pgSQL function mx_call_proc_copy(integer) line XX at SQL statement
-- Mark them as colocated with a table. Now we should route them to workers.
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
colocate_proc_with_table
@ -225,6 +244,12 @@ select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table
(1 row)
select colocate_proc_with_table('mx_call_proc_copy', 'mx_call_dist_table_1'::regclass, 0);
colocate_proc_with_table
---------------------------------------------------------------------
(1 row)
call multi_mx_call.mx_call_proc(2, 0);
DEBUG: pushing down the procedure
y
@ -253,6 +278,8 @@ DEBUG: pushing down the procedure
S | S
(1 row)
call mx_call_proc_copy(2);
DEBUG: pushing down the procedure
-- Test implicit cast of int to bigint
call mx_call_proc_bigint(4, 2);
DEBUG: pushing down the procedure
@ -398,18 +425,51 @@ DETAIL: A distributed function is created. To make sure subsequent commands see
CALL multi_mx_call.mx_call_proc_tx(20);
DEBUG: pushing down the procedure
SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val;
id | val
id | val
---------------------------------------------------------------------
3 | 1
3 | 5
4 | 5
6 | 5
9 | 2
10 | -2
11 | 3
20 | -2
21 | 3
(9 rows)
3 | 1
3 | 5
4 | 5
6 | 5
9 | 2
10 | -2
11 | 3
20 | -2
21 | 3
100 | 98
100 | 98
100 | 98
101 | 99
101 | 99
101 | 99
102 | 100
102 | 100
102 | 100
103 | 101
103 | 101
103 | 101
104 | 102
104 | 102
104 | 102
105 | 103
105 | 103
105 | 103
106 | 104
106 | 104
106 | 104
107 | 105
107 | 105
107 | 105
108 | 106
108 | 106
108 | 106
109 | 107
109 | 107
109 | 107
110 | 108
110 | 108
110 | 108
(42 rows)
-- Show that function delegation works from worker nodes as well
\c - - - :worker_1_port
@ -539,4 +599,4 @@ PL/pgSQL function mx_call_proc(integer,integer) line XX at assignment
reset client_min_messages;
\set VERBOSITY terse
drop schema multi_mx_call cascade;
NOTICE: drop cascades to 13 other objects
NOTICE: drop cascades to 14 other objects

View File

@ -131,6 +131,12 @@ BEGIN
y := x;
x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
END;$$;
CREATE PROCEDURE mx_call_proc_copy(x int)
LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO multi_mx_call.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s;
END;$$;
-- Test that undistributed procedures have no issue executing
call multi_mx_call.mx_call_proc(2, 0);
y
@ -144,6 +150,7 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A');
F | S
(1 row)
call multi_mx_call.mx_call_proc_copy(2);
-- Same for unqualified names
call mx_call_proc(2, 0);
y
@ -176,6 +183,12 @@ select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_ca
(1 row)
select create_distributed_function('mx_call_proc_copy(int)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
-- We still don't route them to the workers, because they aren't
-- colocated with any distributed tables.
SET client_min_messages TO DEBUG1;
@ -206,6 +219,12 @@ DEBUG: stored procedure does not have co-located tables
F | S
(1 row)
call multi_mx_call.mx_call_proc_copy(2);
DEBUG: stored procedure does not have co-located tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
CONTEXT: SQL statement "INSERT INTO multi_mx_call.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s"
PL/pgSQL function mx_call_proc_copy(integer) line XX at SQL statement
-- Mark them as colocated with a table. Now we should route them to workers.
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
colocate_proc_with_table
@ -225,6 +244,12 @@ select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table
(1 row)
select colocate_proc_with_table('mx_call_proc_copy', 'mx_call_dist_table_1'::regclass, 0);
colocate_proc_with_table
---------------------------------------------------------------------
(1 row)
call multi_mx_call.mx_call_proc(2, 0);
DEBUG: pushing down the procedure
y
@ -253,6 +278,8 @@ DEBUG: pushing down the procedure
S | S
(1 row)
call mx_call_proc_copy(2);
DEBUG: pushing down the procedure
-- Test implicit cast of int to bigint
call mx_call_proc_bigint(4, 2);
DEBUG: pushing down the procedure
@ -398,18 +425,51 @@ DETAIL: A distributed function is created. To make sure subsequent commands see
CALL multi_mx_call.mx_call_proc_tx(20);
DEBUG: pushing down the procedure
SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val;
id | val
id | val
---------------------------------------------------------------------
3 | 1
3 | 5
4 | 5
6 | 5
9 | 2
10 | -2
11 | 3
20 | -2
21 | 3
(9 rows)
3 | 1
3 | 5
4 | 5
6 | 5
9 | 2
10 | -2
11 | 3
20 | -2
21 | 3
100 | 98
100 | 98
100 | 98
101 | 99
101 | 99
101 | 99
102 | 100
102 | 100
102 | 100
103 | 101
103 | 101
103 | 101
104 | 102
104 | 102
104 | 102
105 | 103
105 | 103
105 | 103
106 | 104
106 | 104
106 | 104
107 | 105
107 | 105
107 | 105
108 | 106
108 | 106
108 | 106
109 | 107
109 | 107
109 | 107
110 | 108
110 | 108
110 | 108
(42 rows)
-- Show that function delegation works from worker nodes as well
\c - - - :worker_1_port
@ -539,4 +599,4 @@ PL/pgSQL function mx_call_proc(integer,integer) line XX at assignment
reset client_min_messages;
\set VERBOSITY terse
drop schema multi_mx_call cascade;
NOTICE: drop cascades to 13 other objects
NOTICE: drop cascades to 14 other objects

View File

@ -83,6 +83,16 @@ BEGIN
y := x;
x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
END;$$;
-- function which internally uses COPY protocol without remote execution
CREATE FUNCTION mx_call_func_copy(x int)
RETURNS bool
LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s;
RETURN true;
END;$$;
-- Test that undistributed functions have no issue executing
select multi_mx_function_call_delegation.mx_call_func(2, 0);
mx_call_func
@ -96,6 +106,9 @@ select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
(F,S)
(1 row)
select multi_mx_function_call_delegation.mx_call_copy(2);
ERROR: function multi_mx_function_call_delegation.mx_call_copy(integer) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
select squares(4);
squares
---------------------------------------------------------------------
@ -131,6 +144,12 @@ select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_ca
(1 row)
select create_distributed_function('mx_call_func_copy(int)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
select create_distributed_function('squares(int)');
create_distributed_function
---------------------------------------------------------------------
@ -604,20 +623,6 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
29
(1 row)
select mx_call_func(2, 0) from mx_call_dist_table_1;
mx_call_func
---------------------------------------------------------------------
28
28
28
28
28
28
28
28
28
(9 rows)
select mx_call_func(2, 0) where mx_call_func(0, 2) = 0;
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
@ -647,6 +652,24 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
29 | 27
(1 row)
-- we do not delegate the call, but do push down the query
-- that result in remote execution from workers
select mx_call_func(id, 0) from mx_call_dist_table_1;
ERROR: cannot execute a distributed query from a query on a shard
CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment
while executing command on localhost:xxxxx
select mx_call_func(2, 0) from mx_call_dist_table_1 where id = 3;
ERROR: cannot execute a distributed query from a query on a shard
CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment
while executing command on localhost:xxxxx
select mx_call_func_copy(2) from mx_call_dist_table_1 where id = 3;
ERROR: cannot execute a distributed query from a query on a shard
CONTEXT: SQL statement "INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s"
PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_copy(integer) line XX at SQL statement
while executing command on localhost:xxxxx
DO $$ BEGIN perform mx_call_func_tbl(40); END; $$;
DEBUG: not pushing down function calls in a multi-statement transaction
CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)"
@ -725,7 +748,12 @@ HINT: Connect to the coordinator and run it again.
-- show that functions can be delegated from worker nodes
SET client_min_messages TO DEBUG1;
SELECT mx_call_func(2, 0);
DEBUG: pushing down the function call
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((2 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
mx_call_func
---------------------------------------------------------------------
28
@ -736,4 +764,4 @@ SET search_path TO multi_mx_function_call_delegation, public;
RESET client_min_messages;
\set VERBOSITY terse
DROP SCHEMA multi_mx_function_call_delegation CASCADE;
NOTICE: drop cascades to 14 other objects
NOTICE: drop cascades to 15 other objects

View File

@ -83,6 +83,16 @@ BEGIN
y := x;
x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
END;$$;
-- function which internally uses COPY protocol without remote execution
CREATE FUNCTION mx_call_func_copy(x int)
RETURNS bool
LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s;
RETURN true;
END;$$;
-- Test that undistributed functions have no issue executing
select multi_mx_function_call_delegation.mx_call_func(2, 0);
mx_call_func
@ -96,6 +106,9 @@ select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
(F,S)
(1 row)
select multi_mx_function_call_delegation.mx_call_copy(2);
ERROR: function multi_mx_function_call_delegation.mx_call_copy(integer) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
select squares(4);
squares
---------------------------------------------------------------------
@ -131,6 +144,12 @@ select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_ca
(1 row)
select create_distributed_function('mx_call_func_copy(int)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
select create_distributed_function('squares(int)');
create_distributed_function
---------------------------------------------------------------------
@ -604,20 +623,6 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
29
(1 row)
select mx_call_func(2, 0) from mx_call_dist_table_1;
mx_call_func
---------------------------------------------------------------------
28
28
28
28
28
28
28
28
28
(9 rows)
select mx_call_func(2, 0) where mx_call_func(0, 2) = 0;
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
@ -647,6 +652,24 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
29 | 27
(1 row)
-- we do not delegate the call, but do push down the query
-- that result in remote execution from workers
select mx_call_func(id, 0) from mx_call_dist_table_1;
ERROR: cannot execute a distributed query from a query on a shard
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment
while executing command on localhost:xxxxx
select mx_call_func(2, 0) from mx_call_dist_table_1 where id = 3;
ERROR: cannot execute a distributed query from a query on a shard
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment
while executing command on localhost:xxxxx
select mx_call_func_copy(2) from mx_call_dist_table_1 where id = 3;
ERROR: cannot execute a distributed query from a query on a shard
CONTEXT: SQL statement "INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s"
PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_copy(integer) line XX at SQL statement
while executing command on localhost:xxxxx
DO $$ BEGIN perform mx_call_func_tbl(40); END; $$;
DEBUG: not pushing down function calls in a multi-statement transaction
CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)"
@ -725,7 +748,12 @@ HINT: Connect to the coordinator and run it again.
-- show that functions can be delegated from worker nodes
SET client_min_messages TO DEBUG1;
SELECT mx_call_func(2, 0);
DEBUG: pushing down the function call
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (2 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
mx_call_func
---------------------------------------------------------------------
28
@ -736,4 +764,4 @@ SET search_path TO multi_mx_function_call_delegation, public;
RESET client_min_messages;
\set VERBOSITY terse
DROP SCHEMA multi_mx_function_call_delegation CASCADE;
NOTICE: drop cascades to 14 other objects
NOTICE: drop cascades to 15 other objects

View File

@ -268,6 +268,15 @@ BEGIN
END;
$insert_100$ LANGUAGE plpgsql;
CREATE TABLE local_table (value int);
CREATE FUNCTION insert_100_local() RETURNS trigger AS $insert_100$
BEGIN
INSERT INTO local_table VALUES (100);
RETURN NEW;
END;
$insert_100$ LANGUAGE plpgsql;
BEGIN;
CREATE TRIGGER insert_100_trigger
AFTER TRUNCATE ON another_citus_local_table
@ -282,7 +291,7 @@ BEGIN;
SELECT * FROM reference_table;
ROLLBACK;
-- cannot perform remote execution from a trigger on a Citus local table
BEGIN;
-- update should actually update something to test ON UPDATE CASCADE logic
INSERT INTO another_citus_local_table VALUES (600);
@ -296,9 +305,47 @@ BEGIN;
AFTER UPDATE ON citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100();
UPDATE another_citus_local_table SET value=value-1;;
ROLLBACK;
-- can perform regular execution from a trigger on a Citus local table
BEGIN;
-- update should actually update something to test ON UPDATE CASCADE logic
INSERT INTO another_citus_local_table VALUES (600);
INSERT INTO citus_local_table VALUES (600);
CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON another_citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();
CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();
UPDATE another_citus_local_table SET value=value-1;;
-- we should see two rows with "100"
SELECT * FROM reference_table;
SELECT * FROM local_table;
ROLLBACK;
-- can perform local execution from a trigger on a Citus local table
BEGIN;
SELECT citus_add_local_table_to_metadata('local_table');
-- update should actually update something to test ON UPDATE CASCADE logic
INSERT INTO another_citus_local_table VALUES (600);
INSERT INTO citus_local_table VALUES (600);
CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON another_citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();
CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();
UPDATE another_citus_local_table SET value=value-1;;
-- we should see two rows with "100"
SELECT * FROM local_table;
ROLLBACK;
-- test on partitioned citus local tables

View File

@ -65,17 +65,15 @@ ALTER FOREIGN TABLE public.foreign_table_newname VALIDATE CONSTRAINT check_c_2;
ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint IF EXISTS check_c_2;
-- trigger test
CREATE TABLE distributed_table(value int);
SELECT create_distributed_table('distributed_table', 'value');
CREATE TABLE table42(value int);
CREATE FUNCTION insert_42() RETURNS trigger AS $insert_42$
BEGIN
INSERT INTO distributed_table VALUES (42);
INSERT INTO table42 VALUES (42);
RETURN NEW;
END;
$insert_42$ LANGUAGE plpgsql;
CREATE TRIGGER insert_42_trigger
AFTER DELETE ON public.foreign_table_newname
FOR EACH ROW EXECUTE FUNCTION insert_42();
@ -83,14 +81,14 @@ FOR EACH ROW EXECUTE FUNCTION insert_42();
-- do the same pattern from the workers as well
INSERT INTO public.foreign_table_newname VALUES (99, 'test_2');
delete from public.foreign_table_newname where id_test = 99;
select * from distributed_table ORDER BY value;
select * from table42 ORDER BY value;
-- disable trigger
alter foreign table public.foreign_table_newname disable trigger insert_42_trigger;
INSERT INTO public.foreign_table_newname VALUES (99, 'test_2');
delete from public.foreign_table_newname where id_test = 99;
-- should not insert again as trigger disabled
select * from distributed_table ORDER BY value;
select * from table42 ORDER BY value;
DROP TRIGGER insert_42_trigger ON public.foreign_table_newname;

View File

@ -100,9 +100,18 @@ BEGIN
x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
END;$$;
CREATE PROCEDURE mx_call_proc_copy(x int)
LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO multi_mx_call.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s;
END;$$;
-- Test that undistributed procedures have no issue executing
call multi_mx_call.mx_call_proc(2, 0);
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
call multi_mx_call.mx_call_proc_copy(2);
-- Same for unqualified names
call mx_call_proc(2, 0);
@ -112,6 +121,7 @@ call mx_call_proc_custom_types('S', 'A');
select create_distributed_function('mx_call_proc(int,int)');
select create_distributed_function('mx_call_proc_bigint(bigint,bigint)');
select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_call_enum)');
select create_distributed_function('mx_call_proc_copy(int)');
-- We still don't route them to the workers, because they aren't
-- colocated with any distributed tables.
@ -119,16 +129,19 @@ SET client_min_messages TO DEBUG1;
call multi_mx_call.mx_call_proc(2, 0);
call mx_call_proc_bigint(4, 2);
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
call multi_mx_call.mx_call_proc_copy(2);
-- Mark them as colocated with a table. Now we should route them to workers.
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
select colocate_proc_with_table('mx_call_proc_bigint', 'mx_call_dist_table_bigint'::regclass, 1);
select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
select colocate_proc_with_table('mx_call_proc_copy', 'mx_call_dist_table_1'::regclass, 0);
call multi_mx_call.mx_call_proc(2, 0);
call multi_mx_call.mx_call_proc_custom_types('S', 'A');
call mx_call_proc(2, 0);
call mx_call_proc_custom_types('S', 'A');
call mx_call_proc_copy(2);
-- Test implicit cast of int to bigint
call mx_call_proc_bigint(4, 2);

View File

@ -67,9 +67,21 @@ BEGIN
x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
END;$$;
-- function which internally uses COPY protocol without remote execution
CREATE FUNCTION mx_call_func_copy(x int)
RETURNS bool
LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s;
RETURN true;
END;$$;
-- Test that undistributed functions have no issue executing
select multi_mx_function_call_delegation.mx_call_func(2, 0);
select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
select multi_mx_function_call_delegation.mx_call_copy(2);
select squares(4);
-- Same for unqualified name
@ -79,6 +91,7 @@ select mx_call_func(2, 0);
select create_distributed_function('mx_call_func(int,int)');
select create_distributed_function('mx_call_func_bigint(bigint,bigint)');
select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_call_enum)');
select create_distributed_function('mx_call_func_copy(int)');
select create_distributed_function('squares(int)');
@ -249,10 +262,15 @@ select mx_call_func(floor(random())::int, 2);
-- test forms we don't distribute
select * from mx_call_func(2, 0);
select mx_call_func(2, 0) from mx_call_dist_table_1;
select mx_call_func(2, 0) where mx_call_func(0, 2) = 0;
select mx_call_func(2, 0), mx_call_func(0, 2);
-- we do not delegate the call, but do push down the query
-- that result in remote execution from workers
select mx_call_func(id, 0) from mx_call_dist_table_1;
select mx_call_func(2, 0) from mx_call_dist_table_1 where id = 3;
select mx_call_func_copy(2) from mx_call_dist_table_1 where id = 3;
DO $$ BEGIN perform mx_call_func_tbl(40); END; $$;
SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val;