From ee3b50b026be27604b96573f80a1c5dc280d60d0 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 6 Jan 2022 13:58:33 +0100 Subject: [PATCH] Disallow remote execution from queries on shards --- src/backend/distributed/commands/call.c | 36 ++++--- src/backend/distributed/commands/multi_copy.c | 8 ++ .../distributed/commands/utility_hook.c | 10 ++ .../distributed/executor/adaptive_executor.c | 6 ++ .../distributed/executor/multi_executor.c | 45 +++++++++ .../planner/function_call_delegation.c | 34 +++++-- .../test/run_from_same_connection.c | 6 ++ .../transaction/transaction_management.c | 2 + .../distributed/commands/utility_hook.h | 2 + .../distributed/function_call_delegation.h | 8 ++ src/include/distributed/multi_executor.h | 2 + .../expected/citus_local_table_triggers.out | 93 ++++++++++++++++--- .../regress/expected/foreign_tables_mx.out | 14 +-- src/test/regress/expected/multi_mx_call.out | 84 ++++++++++++++--- src/test/regress/expected/multi_mx_call_0.out | 84 ++++++++++++++--- .../multi_mx_function_call_delegation.out | 60 ++++++++---- .../multi_mx_function_call_delegation_0.out | 60 ++++++++---- .../sql/citus_local_table_triggers.sql | 51 +++++++++- src/test/regress/sql/foreign_tables_mx.sql | 10 +- src/test/regress/sql/multi_mx_call.sql | 13 +++ .../sql/multi_mx_function_call_delegation.sql | 20 +++- 21 files changed, 531 insertions(+), 117 deletions(-) diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index dda4eb3de..af319f0ce 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -24,6 +24,7 @@ #include "distributed/commands/utility_hook.h" #include "distributed/connection_management.h" #include "distributed/deparse_shard_query.h" +#include "distributed/function_call_delegation.h" #include "distributed/metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" @@ -46,9 +47,10 @@ #include "utils/lsyscache.h" #include "utils/syscache.h" -static bool CallFuncExprRemotely(CallStmt *callStmt, - DistObjectCacheEntry *procedure, - FuncExpr *funcExpr, DestReceiver *dest); + +/* global variable tracking whether we are in a delegated procedure call */ +bool InDelegatedProcedureCall = false; + /* * CallDistributedProcedureRemotely calls a stored procedure on the worker if possible. @@ -61,28 +63,21 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest) DistObjectCacheEntry *procedure = LookupDistObjectCacheEntry(ProcedureRelationId, functionId, 0); - - /* - * If procedure is not distributed or already delegated from another - * node, do not call the procedure remotely. - */ - if (procedure == NULL || !procedure->isDistributed || - IsCitusInitiatedRemoteBackend()) + if (procedure == NULL || !procedure->isDistributed) { return false; } - return CallFuncExprRemotely(callStmt, procedure, funcExpr, dest); -} + if (IsCitusInitiatedRemoteBackend()) + { + /* + * We are in a citus-initiated backend handling a CALL to a distributed + * procedure. That means that this is the delegated call. + */ + InDelegatedProcedureCall = true; + return false; + } - -/* - * CallFuncExprRemotely calls a procedure of function on the worker if possible. - */ -static bool -CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, - FuncExpr *funcExpr, DestReceiver *dest) -{ if (IsMultiStatementTransaction()) { ereport(DEBUG1, (errmsg("cannot push down CALL in multi-statement transaction"))); @@ -102,6 +97,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, "be constant expressions"))); return false; } + CitusTableCacheEntry *distTable = GetCitusTableCacheEntry(colocatedRelationId); Var *partitionColumn = distTable->partitionColumn; bool colocatedWithReferenceTable = false; diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index d683a2792..d2d7d9b23 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -3340,6 +3340,7 @@ InitializeCopyShardState(CopyShardState *shardState, { ListCell *placementCell = NULL; int failedPlacementCount = 0; + bool hasRemoteCopy = false; MemoryContext localContext = AllocSetContextCreateExtended(CurrentMemoryContext, @@ -3383,6 +3384,8 @@ InitializeCopyShardState(CopyShardState *shardState, continue; } + hasRemoteCopy = true; + MultiConnection *connection = CopyGetPlacementConnection(connectionStateHash, placement, colocatedIntermediateResult); @@ -3427,6 +3430,11 @@ InitializeCopyShardState(CopyShardState *shardState, ereport(ERROR, (errmsg("could not connect to any active placements"))); } + if (hasRemoteCopy) + { + EnsureRemoteTaskExecutionAllowed(); + } + /* * We just error out and code execution should never reach to this * point. This is the case for all tables. diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 672a7ce81..3e2a2d24b 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -227,10 +227,20 @@ multi_ProcessUtility(PlannedStmt *pstmt, params, queryEnv, dest, completionTag); StoredProcedureLevel -= 1; + + if (InDelegatedProcedureCall && StoredProcedureLevel == 0) + { + InDelegatedProcedureCall = false; + } } PG_CATCH(); { StoredProcedureLevel -= 1; + + if (InDelegatedProcedureCall && StoredProcedureLevel == 0) + { + InDelegatedProcedureCall = false; + } PG_RE_THROW(); } PG_END_TRY(); diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 0e9d96fd5..a90e49ced 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1290,6 +1290,12 @@ StartDistributedExecution(DistributedExecution *execution) */ RecordParallelRelationAccessForTaskList(execution->remoteAndLocalTaskList); } + + /* make sure we are not doing remote execution from within a task */ + if (execution->remoteTaskList != NIL) + { + EnsureRemoteTaskExecutionAllowed(); + } } diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 5a535043d..afde1328c 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -21,9 +21,11 @@ #include "distributed/citus_custom_scan.h" #include "distributed/commands/multi_copy.h" #include "distributed/commands/utility_hook.h" +#include "distributed/function_call_delegation.h" #include "distributed/insert_select_executor.h" #include "distributed/insert_select_planner.h" #include "distributed/listutils.h" +#include "distributed/local_executor.h" #include "distributed/coordinator_protocol.h" #include "distributed/multi_executor.h" #include "distributed/combine_query_planner.h" @@ -719,3 +721,46 @@ ExecutorBoundParams(void) Assert(ExecutorLevel > 0); return executorBoundParams; } + + +/* + * EnsureRemoteTaskExecutionAllowed ensures that we do not perform remote + * execution from within a task. That could happen when the user calls + * a function in a query that gets pushed down to the worker, and the + * function performs a query on a distributed table. + */ +void +EnsureRemoteTaskExecutionAllowed(void) +{ + if (!InTaskExecution()) + { + /* we are not within a task, distributed execution is allowed */ + return; + } + + ereport(ERROR, (errmsg("cannot execute a distributed query from a query on a " + "shard"))); +} + + +/* + * InTaskExecution determines whether we are currently in a task execution. + */ +bool +InTaskExecution(void) +{ + if (LocalExecutorLevel > 0) + { + /* in a local task */ + return true; + } + + /* + * Normally, any query execution within a citus-initiated backend + * is considered a task execution, but an exception is when we + * are in a delegated function/procedure call. + */ + return IsCitusInitiatedRemoteBackend() && + !InDelegatedFunctionCall && + !InDelegatedProcedureCall; +} diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 6618e49e1..39cf334c6 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -58,6 +58,10 @@ struct ParamWalkerContext static bool contain_param_walker(Node *node, void *context); +/* global variable keeping track of whether we are in a delegated function call */ +bool InDelegatedFunctionCall = false; + + /* * contain_param_walker scans node for Param nodes. * Ignore the return value, instead check context afterwards. @@ -112,15 +116,6 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) } int32 localGroupId = GetLocalGroupId(); - if (localGroupId != COORDINATOR_GROUP_ID && IsCitusInitiatedRemoteBackend()) - { - /* - * Do not delegate from workers if it is initiated by Citus already. - * It means that this function has already been delegated to this node. - */ - return NULL; - } - if (localGroupId == GROUP_ID_UPGRADING) { /* do not delegate while upgrading */ @@ -218,6 +213,27 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) ereport(DEBUG4, (errmsg("function is distributed"))); } + if (IsCitusInitiatedRemoteBackend()) + { + /* + * We are planning a call to a distributed function within a Citus backend, + * that means that this is the delegated call. + */ + InDelegatedFunctionCall = true; + return NULL; + } + + if (localGroupId != COORDINATOR_GROUP_ID) + { + /* + * We are calling a distributed function on a worker node. We currently + * only delegate from the coordinator. + * + * TODO: remove this restriction. + */ + return NULL; + } + /* * Cannot delegate functions for INSERT ... SELECT func(), since they require * coordinated transactions. diff --git a/src/backend/distributed/test/run_from_same_connection.c b/src/backend/distributed/test/run_from_same_connection.c index e0b7d806c..3b5f804b4 100644 --- a/src/backend/distributed/test/run_from_same_connection.c +++ b/src/backend/distributed/test/run_from_same_connection.c @@ -113,6 +113,12 @@ start_session_level_connection_to_node(PG_FUNCTION_ARGS) elog(ERROR, "failed to connect to %s:%d", nodeNameString, (int) nodePort); } + /* pretend we are a regular client to avoid citus-initiated backend checks */ + const char *setAppName = + "SET application_name TO run_commands_on_session_level_connection_to_node"; + + ExecuteCriticalRemoteCommand(singleConnection, setAppName); + PG_RETURN_VOID(); } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index ee9912fe9..4c4958015 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -23,6 +23,7 @@ #include "distributed/citus_safe_lib.h" #include "distributed/connection_management.h" #include "distributed/distributed_planner.h" +#include "distributed/function_call_delegation.h" #include "distributed/hash_helpers.h" #include "distributed/intermediate_results.h" #include "distributed/listutils.h" @@ -550,6 +551,7 @@ ResetGlobalVariables() ShouldCoordinatedTransactionUse2PC = false; TransactionModifiedNodeMetadata = false; MetadataSyncOnCommit = false; + InDelegatedFunctionCall = false; ResetWorkerErrorIndication(); } diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 9ead0df8b..1ee18a206 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -18,6 +18,7 @@ #include "tcop/utility.h" #include "distributed/coordinator_protocol.h" +#include "distributed/function_call_delegation.h" #include "distributed/version_compat.h" #include "distributed/worker_transaction.h" @@ -37,6 +38,7 @@ extern bool EnableAlterRolePropagation; extern bool EnableAlterRoleSetPropagation; extern bool EnableAlterDatabaseOwner; extern int UtilityHookLevel; +extern bool InDelegatedProcedureCall; /* diff --git a/src/include/distributed/function_call_delegation.h b/src/include/distributed/function_call_delegation.h index 865ac0ce1..7d3c61aea 100644 --- a/src/include/distributed/function_call_delegation.h +++ b/src/include/distributed/function_call_delegation.h @@ -15,6 +15,14 @@ #include "distributed/multi_physical_planner.h" +/* + * These flags keep track of whether the process is currently in a delegated + * function or procedure call. + */ +extern bool InDelegatedFunctionCall; +extern bool InDelegatedProcedureCall; + + PlannedStmt * TryToDelegateFunctionCall(DistributedPlanningContext *planContext); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 143f5a1c7..3648dbc1b 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -149,6 +149,8 @@ extern void ExtractParametersFromParamList(ParamListInfo paramListInfo, const char ***parameterValues, bool useOriginalCustomTypeOids); extern ParamListInfo ExecutorBoundParams(void); +extern void EnsureRemoteTaskExecutionAllowed(void); +extern bool InTaskExecution(void); #endif /* MULTI_EXECUTOR_H */ diff --git a/src/test/regress/expected/citus_local_table_triggers.out b/src/test/regress/expected/citus_local_table_triggers.out index ac6906282..1a269c649 100644 --- a/src/test/regress/expected/citus_local_table_triggers.out +++ b/src/test/regress/expected/citus_local_table_triggers.out @@ -392,6 +392,13 @@ BEGIN RETURN NEW; END; $insert_100$ LANGUAGE plpgsql; +CREATE TABLE local_table (value int); +CREATE FUNCTION insert_100_local() RETURNS trigger AS $insert_100$ +BEGIN + INSERT INTO local_table VALUES (100); + RETURN NEW; +END; +$insert_100$ LANGUAGE plpgsql; BEGIN; CREATE TRIGGER insert_100_trigger AFTER TRUNCATE ON another_citus_local_table @@ -416,6 +423,7 @@ NOTICE: executing the command locally: SELECT value FROM citus_local_table_trig (2 rows) ROLLBACK; +-- cannot perform remote execution from a trigger on a Citus local table BEGIN; -- update should actually update something to test ON UPDATE CASCADE logic INSERT INTO another_citus_local_table VALUES (600); @@ -436,11 +444,70 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1 FOR EACH STATEMENT EXECUTE FUNCTION insert_100();') UPDATE another_citus_local_table SET value=value-1;; NOTICE: executing the command locally: UPDATE citus_local_table_triggers.another_citus_local_table_1507009 another_citus_local_table SET value = (value OPERATOR(pg_catalog.-) 1) -NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.reference_table_1507010 (value) VALUES (100) -NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.reference_table_1507010 (value) VALUES (100) +ERROR: cannot execute a distributed query from a query on a shard +ROLLBACK; +-- can perform regular execution from a trigger on a Citus local table +BEGIN; + -- update should actually update something to test ON UPDATE CASCADE logic + INSERT INTO another_citus_local_table VALUES (600); +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.another_citus_local_table_1507009 (value) VALUES (600) + INSERT INTO citus_local_table VALUES (600); +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.citus_local_table_1507001 (value) VALUES (600) + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON another_citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507009, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON another_citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();') + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507001, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();') + UPDATE another_citus_local_table SET value=value-1;; +NOTICE: executing the command locally: UPDATE citus_local_table_triggers.another_citus_local_table_1507009 another_citus_local_table SET value = (value OPERATOR(pg_catalog.-) 1) -- we should see two rows with "100" - SELECT * FROM reference_table; -NOTICE: executing the command locally: SELECT value FROM citus_local_table_triggers.reference_table_1507010 reference_table + SELECT * FROM local_table; + value +--------------------------------------------------------------------- + 100 + 100 +(2 rows) + +ROLLBACK; +-- can perform local execution from a trigger on a Citus local table +BEGIN; + SELECT citus_add_local_table_to_metadata('local_table'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + + -- update should actually update something to test ON UPDATE CASCADE logic + INSERT INTO another_citus_local_table VALUES (600); +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.another_citus_local_table_1507009 (value) VALUES (600) + INSERT INTO citus_local_table VALUES (600); +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.citus_local_table_1507001 (value) VALUES (600) + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON another_citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507009, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON another_citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();') + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507001, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();') + UPDATE another_citus_local_table SET value=value-1;; +NOTICE: executing the command locally: UPDATE citus_local_table_triggers.another_citus_local_table_1507009 another_citus_local_table SET value = (value OPERATOR(pg_catalog.-) 1) +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.local_table_1507011 (value) VALUES (100) +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.local_table_1507011 (value) VALUES (100) + -- we should see two rows with "100" + SELECT * FROM local_table; +NOTICE: executing the command locally: SELECT value FROM citus_local_table_triggers.local_table_1507011 local_table value --------------------------------------------------------------------- 100 @@ -456,11 +523,11 @@ CREATE TABLE par_another_citus_local_table_1 PARTITION OF par_another_citus_loca ALTER TABLE par_another_citus_local_table ADD CONSTRAINT fkey_self FOREIGN KEY(val) REFERENCES par_another_citus_local_table(val); ALTER TABLE par_citus_local_table ADD CONSTRAINT fkey_c_to_c FOREIGN KEY(val) REFERENCES par_another_citus_local_table(val) ON UPDATE CASCADE; SELECT citus_add_local_table_to_metadata('par_another_citus_local_table', cascade_via_foreign_keys=>true); -NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507011, 'citus_local_table_triggers', 1507012, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_another_citus_local_table_1 FOR VALUES FROM (1) TO (10000);') -NOTICE: executing the command locally: SELECT pg_catalog.citus_run_local_command($$SELECT worker_fix_partition_shard_index_names('citus_local_table_triggers.par_another_citus_local_table_val_key_1507011'::regclass, 'citus_local_table_triggers.par_another_citus_local_table_1_1507012', 'par_another_citus_local_table_1_val_key_1507012')$$) -NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507013, 'citus_local_table_triggers', 1507014, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_citus_local_table_1 FOR VALUES FROM (1) TO (10000);') -NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507011, 'citus_local_table_triggers', 1507011, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ADD CONSTRAINT fkey_self FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val)') -NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507013, 'citus_local_table_triggers', 1507011, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ADD CONSTRAINT fkey_c_to_c FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val) ON UPDATE CASCADE') +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507012, 'citus_local_table_triggers', 1507013, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_another_citus_local_table_1 FOR VALUES FROM (1) TO (10000);') +NOTICE: executing the command locally: SELECT pg_catalog.citus_run_local_command($$SELECT worker_fix_partition_shard_index_names('citus_local_table_triggers.par_another_citus_local_table_val_key_1507012'::regclass, 'citus_local_table_triggers.par_another_citus_local_table_1_1507013', 'par_another_citus_local_table_1_val_key_1507013')$$) +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507014, 'citus_local_table_triggers', 1507015, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_citus_local_table_1 FOR VALUES FROM (1) TO (10000);') +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507012, 'citus_local_table_triggers', 1507012, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ADD CONSTRAINT fkey_self FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val)') +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507014, 'citus_local_table_triggers', 1507012, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ADD CONSTRAINT fkey_c_to_c FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val) ON UPDATE CASCADE') citus_add_local_table_to_metadata --------------------------------------------------------------------- @@ -489,7 +556,7 @@ BEGIN; TRUNCATE par_another_citus_local_table CASCADE; NOTICE: truncate cascades to table "par_citus_local_table" NOTICE: truncate cascades to table "par_citus_local_table_1" -NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507015 (val) VALUES (100) +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507016 (val) VALUES (100) NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_triggers.par_another_citus_local_table_xxxxx CASCADE NOTICE: truncate cascades to table "par_citus_local_table_xxxxx" NOTICE: truncate cascades to table "par_citus_local_table_1_xxxxx" @@ -497,12 +564,12 @@ NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_trigger NOTICE: truncate cascades to table "par_citus_local_table_xxxxx" NOTICE: truncate cascades to table "par_citus_local_table_1_xxxxx" NOTICE: truncate cascades to table "par_another_citus_local_table_xxxxx" -NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507015 (val) VALUES (100) +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507016 (val) VALUES (100) NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_triggers.par_citus_local_table_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_triggers.par_citus_local_table_1_xxxxx CASCADE -- we should see two rows with "100" SELECT * FROM par_reference_table; -NOTICE: executing the command locally: SELECT val FROM citus_local_table_triggers.par_reference_table_1507015 par_reference_table +NOTICE: executing the command locally: SELECT val FROM citus_local_table_triggers.par_reference_table_1507016 par_reference_table val --------------------------------------------------------------------- 100 @@ -512,4 +579,4 @@ NOTICE: executing the command locally: SELECT val FROM citus_local_table_trigge ROLLBACK; -- cleanup at exit DROP SCHEMA citus_local_table_triggers, "interesting!schema" CASCADE; -NOTICE: drop cascades to 20 other objects +NOTICE: drop cascades to 22 other objects diff --git a/src/test/regress/expected/foreign_tables_mx.out b/src/test/regress/expected/foreign_tables_mx.out index a3de72259..17b1c99b8 100644 --- a/src/test/regress/expected/foreign_tables_mx.out +++ b/src/test/regress/expected/foreign_tables_mx.out @@ -66,16 +66,10 @@ ALTER FOREIGN TABLE public.foreign_table_newname ADD CONSTRAINT check_c_2 check( ALTER FOREIGN TABLE public.foreign_table_newname VALIDATE CONSTRAINT check_c_2; ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint IF EXISTS check_c_2; -- trigger test -CREATE TABLE distributed_table(value int); -SELECT create_distributed_table('distributed_table', 'value'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - +CREATE TABLE table42(value int); CREATE FUNCTION insert_42() RETURNS trigger AS $insert_42$ BEGIN - INSERT INTO distributed_table VALUES (42); + INSERT INTO table42 VALUES (42); RETURN NEW; END; $insert_42$ LANGUAGE plpgsql; @@ -85,7 +79,7 @@ FOR EACH ROW EXECUTE FUNCTION insert_42(); -- do the same pattern from the workers as well INSERT INTO public.foreign_table_newname VALUES (99, 'test_2'); delete from public.foreign_table_newname where id_test = 99; -select * from distributed_table ORDER BY value; +select * from table42 ORDER BY value; value --------------------------------------------------------------------- 42 @@ -96,7 +90,7 @@ alter foreign table public.foreign_table_newname disable trigger insert_42_trigg INSERT INTO public.foreign_table_newname VALUES (99, 'test_2'); delete from public.foreign_table_newname where id_test = 99; -- should not insert again as trigger disabled -select * from distributed_table ORDER BY value; +select * from table42 ORDER BY value; value --------------------------------------------------------------------- 42 diff --git a/src/test/regress/expected/multi_mx_call.out b/src/test/regress/expected/multi_mx_call.out index 7f077c77a..64b033d41 100644 --- a/src/test/regress/expected/multi_mx_call.out +++ b/src/test/regress/expected/multi_mx_call.out @@ -131,6 +131,12 @@ BEGIN y := x; x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); END;$$; +CREATE PROCEDURE mx_call_proc_copy(x int) +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_call.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s; +END;$$; -- Test that undistributed procedures have no issue executing call multi_mx_call.mx_call_proc(2, 0); y @@ -144,6 +150,7 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A'); F | S (1 row) +call multi_mx_call.mx_call_proc_copy(2); -- Same for unqualified names call mx_call_proc(2, 0); y @@ -176,6 +183,12 @@ select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_ca (1 row) +select create_distributed_function('mx_call_proc_copy(int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + -- We still don't route them to the workers, because they aren't -- colocated with any distributed tables. SET client_min_messages TO DEBUG1; @@ -206,6 +219,12 @@ DEBUG: stored procedure does not have co-located tables F | S (1 row) +call multi_mx_call.mx_call_proc_copy(2); +DEBUG: stored procedure does not have co-located tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +CONTEXT: SQL statement "INSERT INTO multi_mx_call.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s" +PL/pgSQL function mx_call_proc_copy(integer) line XX at SQL statement -- Mark them as colocated with a table. Now we should route them to workers. select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); colocate_proc_with_table @@ -225,6 +244,12 @@ select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table (1 row) +select colocate_proc_with_table('mx_call_proc_copy', 'mx_call_dist_table_1'::regclass, 0); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + call multi_mx_call.mx_call_proc(2, 0); DEBUG: pushing down the procedure y @@ -253,6 +278,8 @@ DEBUG: pushing down the procedure S | S (1 row) +call mx_call_proc_copy(2); +DEBUG: pushing down the procedure -- Test implicit cast of int to bigint call mx_call_proc_bigint(4, 2); DEBUG: pushing down the procedure @@ -398,18 +425,51 @@ DETAIL: A distributed function is created. To make sure subsequent commands see CALL multi_mx_call.mx_call_proc_tx(20); DEBUG: pushing down the procedure SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val; - id | val + id | val --------------------------------------------------------------------- - 3 | 1 - 3 | 5 - 4 | 5 - 6 | 5 - 9 | 2 - 10 | -2 - 11 | 3 - 20 | -2 - 21 | 3 -(9 rows) + 3 | 1 + 3 | 5 + 4 | 5 + 6 | 5 + 9 | 2 + 10 | -2 + 11 | 3 + 20 | -2 + 21 | 3 + 100 | 98 + 100 | 98 + 100 | 98 + 101 | 99 + 101 | 99 + 101 | 99 + 102 | 100 + 102 | 100 + 102 | 100 + 103 | 101 + 103 | 101 + 103 | 101 + 104 | 102 + 104 | 102 + 104 | 102 + 105 | 103 + 105 | 103 + 105 | 103 + 106 | 104 + 106 | 104 + 106 | 104 + 107 | 105 + 107 | 105 + 107 | 105 + 108 | 106 + 108 | 106 + 108 | 106 + 109 | 107 + 109 | 107 + 109 | 107 + 110 | 108 + 110 | 108 + 110 | 108 +(42 rows) -- Show that function delegation works from worker nodes as well \c - - - :worker_1_port @@ -539,4 +599,4 @@ PL/pgSQL function mx_call_proc(integer,integer) line XX at assignment reset client_min_messages; \set VERBOSITY terse drop schema multi_mx_call cascade; -NOTICE: drop cascades to 13 other objects +NOTICE: drop cascades to 14 other objects diff --git a/src/test/regress/expected/multi_mx_call_0.out b/src/test/regress/expected/multi_mx_call_0.out index 77667f75b..496e735c9 100644 --- a/src/test/regress/expected/multi_mx_call_0.out +++ b/src/test/regress/expected/multi_mx_call_0.out @@ -131,6 +131,12 @@ BEGIN y := x; x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); END;$$; +CREATE PROCEDURE mx_call_proc_copy(x int) +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_call.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s; +END;$$; -- Test that undistributed procedures have no issue executing call multi_mx_call.mx_call_proc(2, 0); y @@ -144,6 +150,7 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A'); F | S (1 row) +call multi_mx_call.mx_call_proc_copy(2); -- Same for unqualified names call mx_call_proc(2, 0); y @@ -176,6 +183,12 @@ select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_ca (1 row) +select create_distributed_function('mx_call_proc_copy(int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + -- We still don't route them to the workers, because they aren't -- colocated with any distributed tables. SET client_min_messages TO DEBUG1; @@ -206,6 +219,12 @@ DEBUG: stored procedure does not have co-located tables F | S (1 row) +call multi_mx_call.mx_call_proc_copy(2); +DEBUG: stored procedure does not have co-located tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +CONTEXT: SQL statement "INSERT INTO multi_mx_call.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s" +PL/pgSQL function mx_call_proc_copy(integer) line XX at SQL statement -- Mark them as colocated with a table. Now we should route them to workers. select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); colocate_proc_with_table @@ -225,6 +244,12 @@ select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table (1 row) +select colocate_proc_with_table('mx_call_proc_copy', 'mx_call_dist_table_1'::regclass, 0); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + call multi_mx_call.mx_call_proc(2, 0); DEBUG: pushing down the procedure y @@ -253,6 +278,8 @@ DEBUG: pushing down the procedure S | S (1 row) +call mx_call_proc_copy(2); +DEBUG: pushing down the procedure -- Test implicit cast of int to bigint call mx_call_proc_bigint(4, 2); DEBUG: pushing down the procedure @@ -398,18 +425,51 @@ DETAIL: A distributed function is created. To make sure subsequent commands see CALL multi_mx_call.mx_call_proc_tx(20); DEBUG: pushing down the procedure SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val; - id | val + id | val --------------------------------------------------------------------- - 3 | 1 - 3 | 5 - 4 | 5 - 6 | 5 - 9 | 2 - 10 | -2 - 11 | 3 - 20 | -2 - 21 | 3 -(9 rows) + 3 | 1 + 3 | 5 + 4 | 5 + 6 | 5 + 9 | 2 + 10 | -2 + 11 | 3 + 20 | -2 + 21 | 3 + 100 | 98 + 100 | 98 + 100 | 98 + 101 | 99 + 101 | 99 + 101 | 99 + 102 | 100 + 102 | 100 + 102 | 100 + 103 | 101 + 103 | 101 + 103 | 101 + 104 | 102 + 104 | 102 + 104 | 102 + 105 | 103 + 105 | 103 + 105 | 103 + 106 | 104 + 106 | 104 + 106 | 104 + 107 | 105 + 107 | 105 + 107 | 105 + 108 | 106 + 108 | 106 + 108 | 106 + 109 | 107 + 109 | 107 + 109 | 107 + 110 | 108 + 110 | 108 + 110 | 108 +(42 rows) -- Show that function delegation works from worker nodes as well \c - - - :worker_1_port @@ -539,4 +599,4 @@ PL/pgSQL function mx_call_proc(integer,integer) line XX at assignment reset client_min_messages; \set VERBOSITY terse drop schema multi_mx_call cascade; -NOTICE: drop cascades to 13 other objects +NOTICE: drop cascades to 14 other objects diff --git a/src/test/regress/expected/multi_mx_function_call_delegation.out b/src/test/regress/expected/multi_mx_function_call_delegation.out index 4fd13cee8..e77e0c3b5 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation.out @@ -83,6 +83,16 @@ BEGIN y := x; x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); END;$$; +-- function which internally uses COPY protocol without remote execution +CREATE FUNCTION mx_call_func_copy(x int) +RETURNS bool +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s; + + RETURN true; +END;$$; -- Test that undistributed functions have no issue executing select multi_mx_function_call_delegation.mx_call_func(2, 0); mx_call_func @@ -96,6 +106,9 @@ select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); (F,S) (1 row) +select multi_mx_function_call_delegation.mx_call_copy(2); +ERROR: function multi_mx_function_call_delegation.mx_call_copy(integer) does not exist +HINT: No function matches the given name and argument types. You might need to add explicit type casts. select squares(4); squares --------------------------------------------------------------------- @@ -131,6 +144,12 @@ select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_ca (1 row) +select create_distributed_function('mx_call_func_copy(int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + select create_distributed_function('squares(int)'); create_distributed_function --------------------------------------------------------------------- @@ -604,20 +623,6 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment 29 (1 row) -select mx_call_func(2, 0) from mx_call_dist_table_1; - mx_call_func ---------------------------------------------------------------------- - 28 - 28 - 28 - 28 - 28 - 28 - 28 - 28 - 28 -(9 rows) - select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" @@ -647,6 +652,24 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment 29 | 27 (1 row) +-- we do not delegate the call, but do push down the query +-- that result in remote execution from workers +select mx_call_func(id, 0) from mx_call_dist_table_1; +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment +while executing command on localhost:xxxxx +select mx_call_func(2, 0) from mx_call_dist_table_1 where id = 3; +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment +while executing command on localhost:xxxxx +select mx_call_func_copy(2) from mx_call_dist_table_1 where id = 3; +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s" +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_copy(integer) line XX at SQL statement +while executing command on localhost:xxxxx DO $$ BEGIN perform mx_call_func_tbl(40); END; $$; DEBUG: not pushing down function calls in a multi-statement transaction CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)" @@ -725,7 +748,12 @@ HINT: Connect to the coordinator and run it again. -- show that functions can be delegated from worker nodes SET client_min_messages TO DEBUG1; SELECT mx_call_func(2, 0); -DEBUG: pushing down the function call +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line XX at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((2 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 28 @@ -736,4 +764,4 @@ SET search_path TO multi_mx_function_call_delegation, public; RESET client_min_messages; \set VERBOSITY terse DROP SCHEMA multi_mx_function_call_delegation CASCADE; -NOTICE: drop cascades to 14 other objects +NOTICE: drop cascades to 15 other objects diff --git a/src/test/regress/expected/multi_mx_function_call_delegation_0.out b/src/test/regress/expected/multi_mx_function_call_delegation_0.out index 70672b455..657183bc2 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation_0.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation_0.out @@ -83,6 +83,16 @@ BEGIN y := x; x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); END;$$; +-- function which internally uses COPY protocol without remote execution +CREATE FUNCTION mx_call_func_copy(x int) +RETURNS bool +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s; + + RETURN true; +END;$$; -- Test that undistributed functions have no issue executing select multi_mx_function_call_delegation.mx_call_func(2, 0); mx_call_func @@ -96,6 +106,9 @@ select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); (F,S) (1 row) +select multi_mx_function_call_delegation.mx_call_copy(2); +ERROR: function multi_mx_function_call_delegation.mx_call_copy(integer) does not exist +HINT: No function matches the given name and argument types. You might need to add explicit type casts. select squares(4); squares --------------------------------------------------------------------- @@ -131,6 +144,12 @@ select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_ca (1 row) +select create_distributed_function('mx_call_func_copy(int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + select create_distributed_function('squares(int)'); create_distributed_function --------------------------------------------------------------------- @@ -604,20 +623,6 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment 29 (1 row) -select mx_call_func(2, 0) from mx_call_dist_table_1; - mx_call_func ---------------------------------------------------------------------- - 28 - 28 - 28 - 28 - 28 - 28 - 28 - 28 - 28 -(9 rows) - select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" @@ -647,6 +652,24 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment 29 | 27 (1 row) +-- we do not delegate the call, but do push down the query +-- that result in remote execution from workers +select mx_call_func(id, 0) from mx_call_dist_table_1; +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment +while executing command on localhost:xxxxx +select mx_call_func(2, 0) from mx_call_dist_table_1 where id = 3; +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment +while executing command on localhost:xxxxx +select mx_call_func_copy(2) from mx_call_dist_table_1 where id = 3; +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s" +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_copy(integer) line XX at SQL statement +while executing command on localhost:xxxxx DO $$ BEGIN perform mx_call_func_tbl(40); END; $$; DEBUG: not pushing down function calls in a multi-statement transaction CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)" @@ -725,7 +748,12 @@ HINT: Connect to the coordinator and run it again. -- show that functions can be delegated from worker nodes SET client_min_messages TO DEBUG1; SELECT mx_call_func(2, 0); -DEBUG: pushing down the function call +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line XX at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (2 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 28 @@ -736,4 +764,4 @@ SET search_path TO multi_mx_function_call_delegation, public; RESET client_min_messages; \set VERBOSITY terse DROP SCHEMA multi_mx_function_call_delegation CASCADE; -NOTICE: drop cascades to 14 other objects +NOTICE: drop cascades to 15 other objects diff --git a/src/test/regress/sql/citus_local_table_triggers.sql b/src/test/regress/sql/citus_local_table_triggers.sql index d091c498b..76b192388 100644 --- a/src/test/regress/sql/citus_local_table_triggers.sql +++ b/src/test/regress/sql/citus_local_table_triggers.sql @@ -268,6 +268,15 @@ BEGIN END; $insert_100$ LANGUAGE plpgsql; +CREATE TABLE local_table (value int); + +CREATE FUNCTION insert_100_local() RETURNS trigger AS $insert_100$ +BEGIN + INSERT INTO local_table VALUES (100); + RETURN NEW; +END; +$insert_100$ LANGUAGE plpgsql; + BEGIN; CREATE TRIGGER insert_100_trigger AFTER TRUNCATE ON another_citus_local_table @@ -282,7 +291,7 @@ BEGIN; SELECT * FROM reference_table; ROLLBACK; - +-- cannot perform remote execution from a trigger on a Citus local table BEGIN; -- update should actually update something to test ON UPDATE CASCADE logic INSERT INTO another_citus_local_table VALUES (600); @@ -296,9 +305,47 @@ BEGIN; AFTER UPDATE ON citus_local_table FOR EACH STATEMENT EXECUTE FUNCTION insert_100(); + UPDATE another_citus_local_table SET value=value-1;; +ROLLBACK; + +-- can perform regular execution from a trigger on a Citus local table +BEGIN; + -- update should actually update something to test ON UPDATE CASCADE logic + INSERT INTO another_citus_local_table VALUES (600); + INSERT INTO citus_local_table VALUES (600); + + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON another_citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); + + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); + UPDATE another_citus_local_table SET value=value-1;; -- we should see two rows with "100" - SELECT * FROM reference_table; + SELECT * FROM local_table; +ROLLBACK; + +-- can perform local execution from a trigger on a Citus local table +BEGIN; + SELECT citus_add_local_table_to_metadata('local_table'); + + -- update should actually update something to test ON UPDATE CASCADE logic + INSERT INTO another_citus_local_table VALUES (600); + INSERT INTO citus_local_table VALUES (600); + + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON another_citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); + + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); + + UPDATE another_citus_local_table SET value=value-1;; + -- we should see two rows with "100" + SELECT * FROM local_table; ROLLBACK; -- test on partitioned citus local tables diff --git a/src/test/regress/sql/foreign_tables_mx.sql b/src/test/regress/sql/foreign_tables_mx.sql index a7c3138df..fdd391d10 100644 --- a/src/test/regress/sql/foreign_tables_mx.sql +++ b/src/test/regress/sql/foreign_tables_mx.sql @@ -65,17 +65,15 @@ ALTER FOREIGN TABLE public.foreign_table_newname VALIDATE CONSTRAINT check_c_2; ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint IF EXISTS check_c_2; -- trigger test -CREATE TABLE distributed_table(value int); -SELECT create_distributed_table('distributed_table', 'value'); +CREATE TABLE table42(value int); CREATE FUNCTION insert_42() RETURNS trigger AS $insert_42$ BEGIN - INSERT INTO distributed_table VALUES (42); + INSERT INTO table42 VALUES (42); RETURN NEW; END; $insert_42$ LANGUAGE plpgsql; - CREATE TRIGGER insert_42_trigger AFTER DELETE ON public.foreign_table_newname FOR EACH ROW EXECUTE FUNCTION insert_42(); @@ -83,14 +81,14 @@ FOR EACH ROW EXECUTE FUNCTION insert_42(); -- do the same pattern from the workers as well INSERT INTO public.foreign_table_newname VALUES (99, 'test_2'); delete from public.foreign_table_newname where id_test = 99; -select * from distributed_table ORDER BY value; +select * from table42 ORDER BY value; -- disable trigger alter foreign table public.foreign_table_newname disable trigger insert_42_trigger; INSERT INTO public.foreign_table_newname VALUES (99, 'test_2'); delete from public.foreign_table_newname where id_test = 99; -- should not insert again as trigger disabled -select * from distributed_table ORDER BY value; +select * from table42 ORDER BY value; DROP TRIGGER insert_42_trigger ON public.foreign_table_newname; diff --git a/src/test/regress/sql/multi_mx_call.sql b/src/test/regress/sql/multi_mx_call.sql index 7df194ea7..4728b8948 100644 --- a/src/test/regress/sql/multi_mx_call.sql +++ b/src/test/regress/sql/multi_mx_call.sql @@ -100,9 +100,18 @@ BEGIN x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); END;$$; +CREATE PROCEDURE mx_call_proc_copy(x int) +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_call.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s; +END;$$; + + -- Test that undistributed procedures have no issue executing call multi_mx_call.mx_call_proc(2, 0); call multi_mx_call.mx_call_proc_custom_types('S', 'A'); +call multi_mx_call.mx_call_proc_copy(2); -- Same for unqualified names call mx_call_proc(2, 0); @@ -112,6 +121,7 @@ call mx_call_proc_custom_types('S', 'A'); select create_distributed_function('mx_call_proc(int,int)'); select create_distributed_function('mx_call_proc_bigint(bigint,bigint)'); select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_call_enum)'); +select create_distributed_function('mx_call_proc_copy(int)'); -- We still don't route them to the workers, because they aren't -- colocated with any distributed tables. @@ -119,16 +129,19 @@ SET client_min_messages TO DEBUG1; call multi_mx_call.mx_call_proc(2, 0); call mx_call_proc_bigint(4, 2); call multi_mx_call.mx_call_proc_custom_types('S', 'A'); +call multi_mx_call.mx_call_proc_copy(2); -- Mark them as colocated with a table. Now we should route them to workers. select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); select colocate_proc_with_table('mx_call_proc_bigint', 'mx_call_dist_table_bigint'::regclass, 1); select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1); +select colocate_proc_with_table('mx_call_proc_copy', 'mx_call_dist_table_1'::regclass, 0); call multi_mx_call.mx_call_proc(2, 0); call multi_mx_call.mx_call_proc_custom_types('S', 'A'); call mx_call_proc(2, 0); call mx_call_proc_custom_types('S', 'A'); +call mx_call_proc_copy(2); -- Test implicit cast of int to bigint call mx_call_proc_bigint(4, 2); diff --git a/src/test/regress/sql/multi_mx_function_call_delegation.sql b/src/test/regress/sql/multi_mx_function_call_delegation.sql index b2d26e853..4dfe91322 100644 --- a/src/test/regress/sql/multi_mx_function_call_delegation.sql +++ b/src/test/regress/sql/multi_mx_function_call_delegation.sql @@ -67,9 +67,21 @@ BEGIN x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); END;$$; +-- function which internally uses COPY protocol without remote execution +CREATE FUNCTION mx_call_func_copy(x int) +RETURNS bool +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s; + + RETURN true; +END;$$; + -- Test that undistributed functions have no issue executing select multi_mx_function_call_delegation.mx_call_func(2, 0); select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); +select multi_mx_function_call_delegation.mx_call_copy(2); select squares(4); -- Same for unqualified name @@ -79,6 +91,7 @@ select mx_call_func(2, 0); select create_distributed_function('mx_call_func(int,int)'); select create_distributed_function('mx_call_func_bigint(bigint,bigint)'); select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_call_enum)'); +select create_distributed_function('mx_call_func_copy(int)'); select create_distributed_function('squares(int)'); @@ -249,10 +262,15 @@ select mx_call_func(floor(random())::int, 2); -- test forms we don't distribute select * from mx_call_func(2, 0); -select mx_call_func(2, 0) from mx_call_dist_table_1; select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; select mx_call_func(2, 0), mx_call_func(0, 2); +-- we do not delegate the call, but do push down the query +-- that result in remote execution from workers +select mx_call_func(id, 0) from mx_call_dist_table_1; +select mx_call_func(2, 0) from mx_call_dist_table_1 where id = 3; +select mx_call_func_copy(2) from mx_call_dist_table_1 where id = 3; + DO $$ BEGIN perform mx_call_func_tbl(40); END; $$; SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val;